domnikl opened a new issue, #8609:
URL: https://github.com/apache/iceberg/issues/8609

   ### Apache Iceberg version
   
   1.3.1 (latest release)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   I am using Spark Structured Streaming to load data from a Kafka topic and 
store it in a partitioned Iceberg table using a `Trigger.Once()` trigger. It is 
scheduled to run every hour and looks like this:
   
   ```kotlin
   spark
       .readStream()
       .format("kafka")
       .option("kafka.bootstrap.servers", kafkaBootstrapServer)
       .option("subscribe", topic)
       .option("startingOffsets", "earliest")
       .option("failOnDataLoss", "false")
       .load()
       .writeStream()
       .trigger(Trigger.Once())
       .format("iceberg")
       .option("path", catalogTable)
       .outputMode("append")
       .option("fanout-enabled", "true")
       .option("checkpointLocation", "$checkpointBaseLocation/$topic")
       .start()
       .awaitTermination()
   ```
   
   I've noticed strange behaviors of the resulting tables as a lot of the data 
is missing while some rows are being duplicated on every subsequent run because 
the same file names are used per partition and task on each run. Looking into 
it, I found this 
[comment](https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L52)
 about generating unique file names across different JVM instances.
   As the `operationId` is [filled with the streaming query 
Id](https://github.com/apache/iceberg/blob/f7a7eb2c10cb4a9b6b3ea5bfdfc5d085be8b9c31/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L659),
 this also needs to be unique across multiple (subsequent) runs of the 
streaming query as it is being persisted by Spark in checkpoints (`metadata` 
file).
   
   As a workaround I implemented deleting the metadata file before each run, 
forcing Spark to recreate the ID resulting in unique file names, which is a bit 
ugly though.
   
   As per my understanding it shouldn't overwrite the files at all (when using 
`append`) even when restarting the query as it would corrupt previously written 
snapshots. Maybe instead of the query id using the run id would be a viable 
solution here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to