rionmonster opened a new issue, #2420:
URL: https://github.com/apache/fluss/issues/2420

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and 
found nothing similar.
   
   
   ### Fluss version
   
   0.8.0 (latest release)
   
   ### Please describe the bug 🐞
   
   While investigating a separate bug, I noticed an issue that can result in 
transient failures during the `IcebergLakeCommitter.commit()` process. When the 
commit contains both new data files _and_ rewrite results in the same 
transaction, the `commitRewrite()` operation can block for extremely long 
periods of times (observed 15+ minutes) due to it repeatedly attempting to 
validate.
   
   The following chain of events can occur:
   1. **New data/delete files are committed via `AppendFiles` or `RowDelta`**
   This creates a new Iceberg snapshot (creating snapshot N+1).
   2. **`commitRewrite()` is called**
   This calls 
`validateFromSnapshot(rewriteDataFileResults.get(0).snapshotId())`.
   The validation expects snapshot N (the snapshot when rewrite was planned).
   But snapshot N+1 now exists from Step 1.
   Validation fails because conflicting changes are detected.
   3. **Iceberg retries the rewrite commit, but it will never succeed because:**
   The table was already modified in Step 1.
   Each retry re-validates against the same old snapshot.
   The conflict is permanent within this commit cycle.
   
   We can see this demonstrated in the corresponding code as well:
   ```java
   // IcebergLakeCommitter.java:100-147
   @Override
   public long commit(IcebergCommittable committable, Map<String, String> 
snapshotProperties)
           throws IOException {
       try {
           icebergTable.refresh();
   
           // ... build snapshotUpdate ...
   
           // Step 1: Commit data files first (creates new snapshot)
           long snapshotId = commit(snapshotUpdate, snapshotProperties);
   
           // Step 2: Then try to commit rewrite (validates against OLD 
snapshot)
           List<RewriteDataFileResult> rewriteDataFileResults = 
committable.rewriteDataFileResults();
           if (!rewriteDataFileResults.isEmpty()) {
               Long rewriteCommitSnapshotId = 
commitRewrite(rewriteDataFileResults, snapshotProperties);
               // ...
           }
           return snapshotId;
       } catch (Exception e) {
           throw new IOException("Failed to commit to Iceberg table.", e);
       }
   }
   ```
   
   ```java
   // IcebergLakeCommitter.java:149-185
   private Long commitRewrite(
           List<RewriteDataFileResult> rewriteDataFileResults,
           Map<String, String> snapshotProperties) {
       icebergTable.refresh();
       RewriteFiles rewriteFiles = icebergTable.newRewrite();
       try {
           // Validates against the snapshot when rewrite was planned
           // But Step 1 already created a newer snapshot!
           
rewriteFiles.validateFromSnapshot(rewriteDataFileResults.get(0).snapshotId());
           // ... add files ...
           return commit(rewriteFiles, snapshotProperties);  // This fails 
validation
       } catch (Exception e) {
           // Error handling - deletes added files
           LOG.error("Failed to commit rewrite files to iceberg...", e);
           CatalogUtil.deleteFiles(...);
           return null;
       }
   }
   ```
   
   ### Solution
   
   This issue should be addressed pretty easily by adjusting the order of the 
operations within the `commit()` process to ensure that any pending rewrite 
operations are committed before the introduction of new data files similar to 
below:
   
   ```java
   @Override
   public long commit(IcebergCommittable committable, Map<String, String> 
snapshotProperties)
           throws IOException {
       try {
           icebergTable.refresh();
   
           long snapshotId;
   
           // Step 1: Commit rewrite FIRST (while snapshot is still valid)
           List<RewriteDataFileResult> rewriteDataFileResults = 
committable.rewriteDataFileResults();
           if (!rewriteDataFileResults.isEmpty()) {
               Long rewriteCommitSnapshotId = 
commitRewrite(rewriteDataFileResults, snapshotProperties);
               if (rewriteCommitSnapshotId != null) {
                   snapshotId = rewriteCommitSnapshotId;
               }
           }
   
           // Step 2: Then commit new data files
           SnapshotUpdate<?> snapshotUpdate;
           if (committable.getDeleteFiles().isEmpty()) {
               AppendFiles appendFiles = icebergTable.newAppend();
               committable.getDataFiles().forEach(appendFiles::appendFile);
               snapshotUpdate = appendFiles;
           } else {
               RowDelta rowDelta = icebergTable.newRowDelta();
               committable.getDataFiles().forEach(rowDelta::addRows);
               committable.getDeleteFiles().forEach(rowDelta::addDeletes);
               snapshotUpdate = rowDelta;
           }
           snapshotId = commit(snapshotUpdate, snapshotProperties);
   
           return checkNotNull(snapshotId, "Iceberg committed snapshot id must 
be non-null.");
       } catch (Exception e) {
           throw new IOException("Failed to commit to Iceberg table.", e);
       }
   }
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to