Hi,

The exception you see is because of a check in Iceberg that prevents the same 
Spark task to write too many small files for the same partition. It is the same 
for batch and stream writes. To avoid that, you should collocate all records 
for the same data partition within your Spark partition. That can be done by 
sorting the data by partition columns either globally or within partitions.

Spark file source does the same but implicitly [1].

Hope that helps,
Anton

[1] - 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L179
 
<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L179>
 

> On 14 Aug 2019, at 17:03, Dave Sugden <dave.sug...@shopify.com.INVALID> wrote:
> 
> Hi,
> We would like to be able to use the iceberg spark Datasource (IcebergSource) 
> to write kafka sourced streaming dataframes.
> 
> In tests, we are able to succesfully create a partitioned table and write 
> when using the MemoryStream, but when using a kafka source:
> 
> spark.readStream.format("kafka")
> 
> and writing to iceberg:
> 
> dataFrame.writeStream
>           .format("catwalk-iceberg")
>           .outputMode(OutputMode.Append)
>           .trigger(Trigger.Once)
>           .option("path", uri.toString)
>           .option("checkpointLocation", Paths.get(uri.toString, 
> "checkpoint").toString)
>           .start
>           .awaitTermination
> 
> we get this exception:
> 
> Caused by: java.lang.IllegalStateException: Already closed file for 
> partition: happened_at_day=2000-01-01
> at 
> org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:389)
> at 
> org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:350)
> 
> Before I dig deeper, is this something that has worked for anyone?
> 
> Thanks!
> 
> 
> 
> 
> 

Reply via email to