The repartition by happened_at_day worked. Thanks!
On Wed, Aug 14, 2019 at 12:53 PM Ryan Blue <rb...@netflix.com.invalid> wrote: > Anton's solution is right. Iceberg doesn't allow any task to open more > than one file in a partition to avoid creating a huge number of output > files. > > We generally recommend applying a global sort to data when writing from > batch, or adding a repartition to ensure that each task gets the data for > just one partition. That may not work here, though. > > Looks like the problem here is that your data has records for 2000-01-01 > mixed with, say, 2000-01-02. Iceberg closes the 2000-01-01 file to open a > file for 2000-01-02. Then it needs 2000-01-01 again, finds that it's > already closed that file, and throws the exception. > > If you can repartition by happened_at_day, that would fix it. Otherwise, I > think it may be a good idea to keep files open in the streaming writer. We > don't do that in batch because it can take so much memory, but in streaming > you can't necessarily add a sortWithinPartitions to group the data together. > > On Wed, Aug 14, 2019 at 9:12 AM Anton Okolnychyi > <aokolnyc...@apple.com.invalid> wrote: > >> Hi, >> >> The exception you see is because of a check in Iceberg that prevents the >> same Spark task to write too many small files for the same partition. It is >> the same for batch and stream writes. To avoid that, you should collocate >> all records for the same data partition within your Spark partition. That >> can be done by sorting the data by partition columns either globally or >> within partitions. >> >> Spark file source does the same but implicitly [1]. >> >> Hope that helps, >> Anton >> >> [1] - >> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L179 >> >> >> On 14 Aug 2019, at 17:03, Dave Sugden <dave.sug...@shopify.com.INVALID> >> wrote: >> >> Hi, >> We would like to be able to use the iceberg spark Datasource >> (IcebergSource) to write kafka sourced streaming dataframes. >> >> In tests, we are able to succesfully create a partitioned table and write >> when using the MemoryStream, but when using a kafka source: >> >> *spark.readStream.format("kafka")* >> >> and writing to iceberg: >> >> >> >> >> >> >> >> >> *dataFrame.writeStream .format("catwalk-iceberg") >> .outputMode(OutputMode.Append) .trigger(Trigger.Once) >> .option("path", uri.toString) .option("checkpointLocation", >> Paths.get(uri.toString, "checkpoint").toString) .start >> .awaitTermination* >> >> we get this exception: >> >> Caused by: java.lang.IllegalStateException: Already closed file for >> partition: happened_at_day=2000-01-01 >> at >> org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:389) >> at >> org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:350) >> >> Before I dig deeper, is this something that has worked for anyone? >> >> Thanks! >> >> >> >> >> >> >> > > -- > Ryan Blue > Software Engineer > Netflix >