rionmonster opened a new issue, #2420: URL: https://github.com/apache/fluss/issues/2420
### Search before asking - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and found nothing similar. ### Fluss version 0.8.0 (latest release) ### Please describe the bug 🐞 While investigating a separate bug, I noticed an issue that can result in transient failures during the `IcebergLakeCommitter.commit()` process. When the commit contains both new data files _and_ rewrite results in the same transaction, the `commitRewrite()` operation can block for extremely long periods of times (observed 15+ minutes) due to it repeatedly attempting to validate. The following chain of events can occur: 1. **New data/delete files are committed via `AppendFiles` or `RowDelta`** This creates a new Iceberg snapshot (creating snapshot N+1). 2. **`commitRewrite()` is called** This calls `validateFromSnapshot(rewriteDataFileResults.get(0).snapshotId())`. The validation expects snapshot N (the snapshot when rewrite was planned). But snapshot N+1 now exists from Step 1. Validation fails because conflicting changes are detected. 3. **Iceberg retries the rewrite commit, but it will never succeed because:** The table was already modified in Step 1. Each retry re-validates against the same old snapshot. The conflict is permanent within this commit cycle. We can see this demonstrated in the corresponding code as well: ```java // IcebergLakeCommitter.java:100-147 @Override public long commit(IcebergCommittable committable, Map<String, String> snapshotProperties) throws IOException { try { icebergTable.refresh(); // ... build snapshotUpdate ... // Step 1: Commit data files first (creates new snapshot) long snapshotId = commit(snapshotUpdate, snapshotProperties); // Step 2: Then try to commit rewrite (validates against OLD snapshot) List<RewriteDataFileResult> rewriteDataFileResults = committable.rewriteDataFileResults(); if (!rewriteDataFileResults.isEmpty()) { Long rewriteCommitSnapshotId = commitRewrite(rewriteDataFileResults, snapshotProperties); // ... } return snapshotId; } catch (Exception e) { throw new IOException("Failed to commit to Iceberg table.", e); } } ``` ```java // IcebergLakeCommitter.java:149-185 private Long commitRewrite( List<RewriteDataFileResult> rewriteDataFileResults, Map<String, String> snapshotProperties) { icebergTable.refresh(); RewriteFiles rewriteFiles = icebergTable.newRewrite(); try { // Validates against the snapshot when rewrite was planned // But Step 1 already created a newer snapshot! rewriteFiles.validateFromSnapshot(rewriteDataFileResults.get(0).snapshotId()); // ... add files ... return commit(rewriteFiles, snapshotProperties); // This fails validation } catch (Exception e) { // Error handling - deletes added files LOG.error("Failed to commit rewrite files to iceberg...", e); CatalogUtil.deleteFiles(...); return null; } } ``` ### Solution This issue should be addressed pretty easily by adjusting the order of the operations within the `commit()` process to ensure that any pending rewrite operations are committed before the introduction of new data files similar to below: ```java @Override public long commit(IcebergCommittable committable, Map<String, String> snapshotProperties) throws IOException { try { icebergTable.refresh(); long snapshotId; // Step 1: Commit rewrite FIRST (while snapshot is still valid) List<RewriteDataFileResult> rewriteDataFileResults = committable.rewriteDataFileResults(); if (!rewriteDataFileResults.isEmpty()) { Long rewriteCommitSnapshotId = commitRewrite(rewriteDataFileResults, snapshotProperties); if (rewriteCommitSnapshotId != null) { snapshotId = rewriteCommitSnapshotId; } } // Step 2: Then commit new data files SnapshotUpdate<?> snapshotUpdate; if (committable.getDeleteFiles().isEmpty()) { AppendFiles appendFiles = icebergTable.newAppend(); committable.getDataFiles().forEach(appendFiles::appendFile); snapshotUpdate = appendFiles; } else { RowDelta rowDelta = icebergTable.newRowDelta(); committable.getDataFiles().forEach(rowDelta::addRows); committable.getDeleteFiles().forEach(rowDelta::addDeletes); snapshotUpdate = rowDelta; } snapshotId = commit(snapshotUpdate, snapshotProperties); return checkNotNull(snapshotId, "Iceberg committed snapshot id must be non-null."); } catch (Exception e) { throw new IOException("Failed to commit to Iceberg table.", e); } } ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
