dixingxing0 edited a comment on pull request #2109:
URL: https://github.com/apache/iceberg/pull/2109#issuecomment-762686286
> @dixingxing0 can you describe the motivation of checkpointing the
watermarks in Flink state?
>
> Ryan described our use of watermarks in snapshot metadata. They are used
to indicate the data completeness on the ingestion path so that downstream
batch consumer jobs can be triggered when data is complete for a window (like
hourly).
Thanks @stevenzwu, about the watermark state, i am just according to the
current restore behavior:
```java
NavigableMap<Long, byte[]> uncommittedDataFiles = Maps
.newTreeMap(checkpointsState.get().iterator().next())
.tailMap(maxCommittedCheckpointId, false);
if (!uncommittedDataFiles.isEmpty()) {
**// Committed all uncommitted data files from the old flink job to
iceberg table.**
long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId,
maxUncommittedCheckpointId);
}
```
Since flink will commit last uncommitted checkpoint, i think we should also
store the right watermark for that checkpoint.
Our use case is exactly same as you and @rdblue described, except we don't
have multi writers 😄 .
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]