dixingxing0 edited a comment on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-762686286


   > @dixingxing0 can you describe the motivation of checkpointing the 
watermarks in Flink state?
   > 
   > Ryan described our use of watermarks in snapshot metadata. They are used 
to indicate the data completeness on the ingestion path so that downstream 
batch consumer jobs can be triggered when data is complete for a window (like 
hourly).
   
   Thanks @stevenzwu, about the watermark state, i am just according to the 
current restore behavior:
   ```java
         NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
             .newTreeMap(checkpointsState.get().iterator().next())
             .tailMap(maxCommittedCheckpointId, false);
         if (!uncommittedDataFiles.isEmpty()) {
           **// Committed all uncommitted data files from the old flink job to 
iceberg table.**
           long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
           commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, 
maxUncommittedCheckpointId);
         }
   ```
   Since flink will commit last uncommitted checkpoint, i think we should also 
store the right watermark for that checkpoint.
   
   Our use case is exactly same as you and @rdblue described, except we don't 
have multi writers 😄 .


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to