Yes, if you're planning on running compaction anyway you may want to use the fanout writers. I still don't recommend them because they use a ton of memory to buffer Parquet data and create more data files. Up to you though.
On Fri, Sep 8, 2023 at 8:07 AM Nirav Patel <nira...@gmail.com> wrote: > Thanks Ryan for responding. > I think iceberg streaming writes documentation recommends to use fanout > writers for streaming: > https://iceberg.apache.org/docs/latest/spark-structured-streaming/#writing-against-partitioned-table > Regarding, too-many files, that can happen regardless of fanout or > presorting dataframe, right? e.g. I am ingesting event data and writing it > to date(ts) partition table. so for each day there is only one partition > but I'm ingesting and writing event data every 5 minutes so end of the day > I will have more than 200 files per each partition. I will have to run > compaction regardless it seems. > > Best > Nirav > > > > > > On Thu, Aug 31, 2023 at 8:59 AM Ryan Blue <b...@tabular.io> wrote: > >> We generally don't recommend fanout writers because they create lots of >> small data files. It also isn't clear why the table's partitioning isn't >> causing Spark to distribute the data properly -- maybe you're using an old >> Spark version? >> >> In any case, you can distribute the data yourself to align with the >> table's partitioning. You can either use `.repartition` with the dataframes >> API, or you can sort. I recommend sorting by adding an `ORDER BY` statement >> to your SQL. >> >> Ryan >> >> On Wed, Aug 30, 2023 at 10:36 AM Nirav Patel <nira...@gmail.com> wrote: >> >>> Should I try "fanout-enabled" option within foreachBatch method where I >>> do dataframe.write ? >>> >>> On Wed, Aug 30, 2023 at 10:29 AM Nirav Patel <nira...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I am using spark structured streaming and using foreachBatch sink to >>>> append to iceberg dual hidden partitioned table. >>>> I got this infamous error about input dataframe or partition needing to >>>> be clustered: >>>> >>>> *Incoming records violate the writer assumption that records are >>>> clustered by spec and by partition within each spec. Either cluster the >>>> incoming records or switch to fanout writers.* >>>> >>>> I tried setting "fanout-enabled" to "true" before calling foreachBatch >>>> but it didnt work at all. Got same error. >>>> >>>> I tried partitionedBy(days("date"), col("customerid")) and that didn't >>>> work either. >>>> >>>> Then I used spark sql approach: >>>> INSERT INTO {dest_schema_fqn} >>>> SELECT * from {success_agg_tbl} order by date(date), >>>> tenant >>>> >>>> and that worked. >>>> >>>> I know of following table level config: >>>> write.spark.fanout.enabled - False >>>> write.distribution-mode - None >>>> but I have left it to defaults as I assume writer will override those >>>> settings. >>>> >>>> so do "fanout-enabled" option have effect when using with foreachBatch? >>>> (I'm new to spark streaming as well) >>>> >>>> thanks >>>> >>> >> >> -- >> Ryan Blue >> Tabular >> > -- Ryan Blue Tabular