We generally don't recommend fanout writers because they create lots of
small data files. It also isn't clear why the table's partitioning isn't
causing Spark to distribute the data properly -- maybe you're using an old
Spark version?

In any case, you can distribute the data yourself to align with the table's
partitioning. You can either use `.repartition` with the dataframes API, or
you can sort. I recommend sorting by adding an `ORDER BY` statement to your
SQL.

Ryan

On Wed, Aug 30, 2023 at 10:36 AM Nirav Patel <nira...@gmail.com> wrote:

> Should I try "fanout-enabled" option within foreachBatch method where I do
> dataframe.write ?
>
> On Wed, Aug 30, 2023 at 10:29 AM Nirav Patel <nira...@gmail.com> wrote:
>
>> Hi,
>>
>> I am using spark structured streaming and using foreachBatch sink to
>> append to iceberg dual hidden partitioned table.
>> I got this infamous error about input dataframe or partition needing to
>> be clustered:
>>
>> *Incoming records violate the writer assumption that records are
>> clustered by spec and by partition within each spec. Either cluster the
>> incoming records or switch to fanout writers.*
>>
>> I tried setting "fanout-enabled" to "true" before calling foreachBatch
>> but it didnt work at all. Got same error.
>>
>> I tried partitionedBy(days("date"), col("customerid")) and that didn't
>> work either.
>>
>> Then I used spark sql approach:
>> INSERT INTO {dest_schema_fqn}
>>                 SELECT * from {success_agg_tbl} order by date(date),
>> tenant
>>
>> and that worked.
>>
>> I know of following table level config:
>> write.spark.fanout.enabled - False
>> write.distribution-mode - None
>> but I have left it to defaults as I assume writer will override those
>> settings.
>>
>> so do "fanout-enabled" option have effect when using with foreachBatch?
>> (I'm new to spark streaming as well)
>>
>> thanks
>>
>

-- 
Ryan Blue
Tabular

Reply via email to