mxm commented on issue #14090:
URL: https://github.com/apache/iceberg/issues/14090#issuecomment-3319216551

   Thanks for reporting this issue! I just checked the code to see how the 
scenario you described can occur. Let me describe what happens from the 
`DynamicCommitter` perspective in the scenario which you outlined.
   
   **Components**
   - DynamicWriter
   - DynamicWriteResultAggregator
   - DynamicCommitter
   
   **Flow**
   1. Checkpoint 1 gets triggered from the source. Checkpoint barrier travels 
through the DAG until it reaches the sink.
   2. Checkpoint barrier arrives at the sink. 
   3. Flink's sink framework triggers `DynamicWriter#prepareCommit()` which 
sends a DynamicWriteResult downstream to DynamicWriteResultAggregator
   4. Checkpoint barrier gets send downstream and arrives at 
DynamicWriteResultAggregator
   5. `DynamicWriteResultAggregator#prepareSnapshotPreBarrier(1)` gets called. 
Manifests are written for each DynamicWriteResult and send downstream to 
DynamicCommitter.
   6. Checkpoint barrier gets send dowstream and arrives at DynamicCommitter.
   7. `DynamicCommiter#commit()` gets called. DynamicCommitter groups the 
committables by `TableKey (table name, branch name) => checkpoint id => 
committables`. 
   8. The maximum checkpoint id is looked up per TableKey from the 
`flink.max-committed-checkpoint-id` snapshot property. Every table/branch has 
its own snapshot property for the max committed checkpoint id. Committables are 
filtered out who are less or equal than the the max looked up checkpoint id.
   9. For each TableKey (table name / branch), we commit _each_ committable as 
an Iceberg table snapshot.
   10. Checkpoint 1 completes.
   
   (9) is where the flaw lies, because every committable (WriteResult) updates 
the snapshot properties with the the max checkpoint id. We can fix this issue 
by staging all WriteResults for a given TableKey and only then commit. This 
will also work across branches because the max checkpoint id is already 
maintained per branch. Even if we fail in the process, we will only attempt to 
commit the WriteResults for a given TableKey which haven't been committed 
previously.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to