Anton's solution is right. Iceberg doesn't allow any task to open more than one file in a partition to avoid creating a huge number of output files.
We generally recommend applying a global sort to data when writing from batch, or adding a repartition to ensure that each task gets the data for just one partition. That may not work here, though. Looks like the problem here is that your data has records for 2000-01-01 mixed with, say, 2000-01-02. Iceberg closes the 2000-01-01 file to open a file for 2000-01-02. Then it needs 2000-01-01 again, finds that it's already closed that file, and throws the exception. If you can repartition by happened_at_day, that would fix it. Otherwise, I think it may be a good idea to keep files open in the streaming writer. We don't do that in batch because it can take so much memory, but in streaming you can't necessarily add a sortWithinPartitions to group the data together. On Wed, Aug 14, 2019 at 9:12 AM Anton Okolnychyi <aokolnyc...@apple.com.invalid> wrote: > Hi, > > The exception you see is because of a check in Iceberg that prevents the > same Spark task to write too many small files for the same partition. It is > the same for batch and stream writes. To avoid that, you should collocate > all records for the same data partition within your Spark partition. That > can be done by sorting the data by partition columns either globally or > within partitions. > > Spark file source does the same but implicitly [1]. > > Hope that helps, > Anton > > [1] - > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L179 > > > On 14 Aug 2019, at 17:03, Dave Sugden <dave.sug...@shopify.com.INVALID> > wrote: > > Hi, > We would like to be able to use the iceberg spark Datasource > (IcebergSource) to write kafka sourced streaming dataframes. > > In tests, we are able to succesfully create a partitioned table and write > when using the MemoryStream, but when using a kafka source: > > *spark.readStream.format("kafka")* > > and writing to iceberg: > > > > > > > > > *dataFrame.writeStream .format("catwalk-iceberg") > .outputMode(OutputMode.Append) .trigger(Trigger.Once) > .option("path", uri.toString) .option("checkpointLocation", > Paths.get(uri.toString, "checkpoint").toString) .start > .awaitTermination* > > we get this exception: > > Caused by: java.lang.IllegalStateException: Already closed file for > partition: happened_at_day=2000-01-01 > at > org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:389) > at > org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:350) > > Before I dig deeper, is this something that has worked for anyone? > > Thanks! > > > > > > > -- Ryan Blue Software Engineer Netflix