Yes, if you're planning on running compaction anyway you may want to use
the fanout writers. I still don't recommend them because they use a ton of
memory to buffer Parquet data and create more data files. Up to you though.

On Fri, Sep 8, 2023 at 8:07 AM Nirav Patel <nira...@gmail.com> wrote:

> Thanks Ryan for responding.
> I think iceberg streaming writes documentation recommends to use fanout
> writers for streaming:
> https://iceberg.apache.org/docs/latest/spark-structured-streaming/#writing-against-partitioned-table
> Regarding, too-many files, that can happen regardless of fanout or
> presorting dataframe, right? e.g. I am ingesting event data and writing it
> to date(ts) partition table. so for each day there is only one partition
> but I'm ingesting and writing event data every 5 minutes so end of the day
> I will have more than 200 files per each partition. I will have to run
> compaction regardless it seems.
>
> Best
> Nirav
>
>
>
>
>
> On Thu, Aug 31, 2023 at 8:59 AM Ryan Blue <b...@tabular.io> wrote:
>
>> We generally don't recommend fanout writers because they create lots of
>> small data files. It also isn't clear why the table's partitioning isn't
>> causing Spark to distribute the data properly -- maybe you're using an old
>> Spark version?
>>
>> In any case, you can distribute the data yourself to align with the
>> table's partitioning. You can either use `.repartition` with the dataframes
>> API, or you can sort. I recommend sorting by adding an `ORDER BY` statement
>> to your SQL.
>>
>> Ryan
>>
>> On Wed, Aug 30, 2023 at 10:36 AM Nirav Patel <nira...@gmail.com> wrote:
>>
>>> Should I try "fanout-enabled" option within foreachBatch method where I
>>> do dataframe.write ?
>>>
>>> On Wed, Aug 30, 2023 at 10:29 AM Nirav Patel <nira...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am using spark structured streaming and using foreachBatch sink to
>>>> append to iceberg dual hidden partitioned table.
>>>> I got this infamous error about input dataframe or partition needing to
>>>> be clustered:
>>>>
>>>> *Incoming records violate the writer assumption that records are
>>>> clustered by spec and by partition within each spec. Either cluster the
>>>> incoming records or switch to fanout writers.*
>>>>
>>>> I tried setting "fanout-enabled" to "true" before calling foreachBatch
>>>> but it didnt work at all. Got same error.
>>>>
>>>> I tried partitionedBy(days("date"), col("customerid")) and that didn't
>>>> work either.
>>>>
>>>> Then I used spark sql approach:
>>>> INSERT INTO {dest_schema_fqn}
>>>>                 SELECT * from {success_agg_tbl} order by date(date),
>>>> tenant
>>>>
>>>> and that worked.
>>>>
>>>> I know of following table level config:
>>>> write.spark.fanout.enabled - False
>>>> write.distribution-mode - None
>>>> but I have left it to defaults as I assume writer will override those
>>>> settings.
>>>>
>>>> so do "fanout-enabled" option have effect when using with foreachBatch?
>>>> (I'm new to spark streaming as well)
>>>>
>>>> thanks
>>>>
>>>
>>
>> --
>> Ryan Blue
>> Tabular
>>
>

-- 
Ryan Blue
Tabular

Reply via email to