randomdev2026 opened a new issue, #15418:
URL: https://github.com/apache/iceberg/issues/15418

   ### Query engine
   
   Flink 2.01
   
   ### Question
   
   Restarting (Pause/Resume) a Flink-Job using IcebergSource seems to only 
retrieve data from latest Parquet-File. If during a "downtime" more than one 
Parquet-File are written (by some other Flink-Job), those data will be missed.
   
   Apache Iceberg 1.10.1 with HadoopCatalog. Flink Checkpointing-Interval is 
set to 10 seconds.
   
   I am new to Apache Flink and Iceberg so probably a configuration issue?
   
   We use 2 Flink-Jobs: 
   **Job1** reads from Kafka and writes Iceberg-Tables to S3:
   
   ```
           var kafkaSource = KafkaSourceFactory
                   .configure(
                           jobConfig.getKafkaBroker(),
                           jobConfig.getKafkaTopic(),
                           
KafkaSourceFactory.getKafkaRecordDeserializationSchema())
                   .setStartingOffsets(OffsetsInitializer.earliest())
                   .setGroupId(jobConfig.getKafkaGroupId())
                   .setProperties(kafkaProperties)
                   .build();
           ...
           FlinkSink.forRow(streamRow, 
FlinkSchemaUtil.toResolvedSchema(icebergSchema))
                   .tableLoader(TableLoader.fromCatalog(catalogLoader, 
tableIdentifier))
                   .set("write-format", "parquet")
                   .toBranch("main")
                   .append();
   ```
   
   **Job2** reads Iceberg-Table from S3:
   
   ```
           var icebergSource = IcebergSource.forRowData()
                   .tableLoader(tableLoader)
                   .assignerFactory(new SimpleSplitAssignerFactory())
                   .streaming(true)
                   
.streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
                   
.monitorInterval(Duration.ofSeconds(jobConfig.getIcebergMonitorInterval()))
                   .build();
   ```
   
   If Job2 is stopped for a few seconds/minutes and there are some messages 
published to Kafka during that time, resuming Job2 only gets data from last 
parquet file (not from e.g. last 2 parquet files).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to