The repartition by happened_at_day worked. Thanks!

On Wed, Aug 14, 2019 at 12:53 PM Ryan Blue <rb...@netflix.com.invalid>
wrote:

> Anton's solution is right. Iceberg doesn't allow any task to open more
> than one file in a partition to avoid creating a huge number of output
> files.
>
> We generally recommend applying a global sort to data when writing from
> batch, or adding a repartition to ensure that each task gets the data for
> just one partition. That may not work here, though.
>
> Looks like the problem here is that your data has records for 2000-01-01
> mixed with, say, 2000-01-02. Iceberg closes the 2000-01-01 file to open a
> file for 2000-01-02. Then it needs 2000-01-01 again, finds that it's
> already closed that file, and throws the exception.
>
> If you can repartition by happened_at_day, that would fix it. Otherwise, I
> think it may be a good idea to keep files open in the streaming writer. We
> don't do that in batch because it can take so much memory, but in streaming
> you can't necessarily add a sortWithinPartitions to group the data together.
>
> On Wed, Aug 14, 2019 at 9:12 AM Anton Okolnychyi
> <aokolnyc...@apple.com.invalid> wrote:
>
>> Hi,
>>
>> The exception you see is because of a check in Iceberg that prevents the
>> same Spark task to write too many small files for the same partition. It is
>> the same for batch and stream writes. To avoid that, you should collocate
>> all records for the same data partition within your Spark partition. That
>> can be done by sorting the data by partition columns either globally or
>> within partitions.
>>
>> Spark file source does the same but implicitly [1].
>>
>> Hope that helps,
>> Anton
>>
>> [1] -
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L179
>>
>>
>> On 14 Aug 2019, at 17:03, Dave Sugden <dave.sug...@shopify.com.INVALID>
>> wrote:
>>
>> Hi,
>> We would like to be able to use the iceberg spark Datasource
>> (IcebergSource) to write kafka sourced streaming dataframes.
>>
>> In tests, we are able to succesfully create a partitioned table and write
>> when using the MemoryStream, but when using a kafka source:
>>
>> *spark.readStream.format("kafka")*
>>
>> and writing to iceberg:
>>
>>
>>
>>
>>
>>
>>
>>
>> *dataFrame.writeStream          .format("catwalk-iceberg")
>> .outputMode(OutputMode.Append)          .trigger(Trigger.Once)
>> .option("path", uri.toString)          .option("checkpointLocation",
>> Paths.get(uri.toString, "checkpoint").toString)          .start
>> .awaitTermination*
>>
>> we get this exception:
>>
>> Caused by: java.lang.IllegalStateException: Already closed file for
>> partition: happened_at_day=2000-01-01
>> at
>> org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:389)
>> at
>> org.apache.iceberg.spark.source.Writer$PartitionedWriter.write(Writer.java:350)
>>
>> Before I dig deeper, is this something that has worked for anyone?
>>
>> Thanks!
>>
>>
>>
>>
>>
>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Reply via email to