This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new bfc52d218a Core: Fix compute_table_stats failures with concurrent 
writes (#15148)
bfc52d218a is described below

commit bfc52d218adf9fc0229601055c2238f580bcac3a
Author: hemanthboyina <[email protected]>
AuthorDate: Fri Feb 6 00:28:02 2026 +0530

    Core: Fix compute_table_stats failures with concurrent writes (#15148)
    
    * Core: Fix compute_table_stats failures with concurrent writes
    
    * fixed test failures and moved tests to TestSetStatistics
    
    * updated the test variables with configured value
    
    * updated and removed the changes of base and item
    
    * updated concurrent modification test
    
    * updated test naming conventions and nit
---
 .../java/org/apache/iceberg/SetStatistics.java     | 30 +++++++++++--
 .../java/org/apache/iceberg/TestSetStatistics.java | 50 ++++++++++++++++++++++
 2 files changed, 77 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/SetStatistics.java 
b/core/src/main/java/org/apache/iceberg/SetStatistics.java
index ceb3fe91ba..01e06fa16b 100644
--- a/core/src/main/java/org/apache/iceberg/SetStatistics.java
+++ b/core/src/main/java/org/apache/iceberg/SetStatistics.java
@@ -18,12 +18,24 @@
  */
 package org.apache.iceberg;
 
+import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
 
 public class SetStatistics implements UpdateStatistics {
+
   private final TableOperations ops;
   private final Map<Long, Optional<StatisticsFile>> statisticsToSet = 
Maps.newHashMap();
 
@@ -50,9 +62,21 @@ public class SetStatistics implements UpdateStatistics {
 
   @Override
   public void commit() {
-    TableMetadata base = ops.current();
-    TableMetadata newMetadata = internalApply(base);
-    ops.commit(base, newMetadata);
+    Tasks.foreach(ops)
+        .retry(ops.current().propertyAsInt(COMMIT_NUM_RETRIES, 
COMMIT_NUM_RETRIES_DEFAULT))
+        .exponentialBackoff(
+            ops.current().propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, 
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+            ops.current().propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, 
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+            ops.current()
+                .propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, 
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+            2.0 /* exponential */)
+        .onlyRetryOn(CommitFailedException.class)
+        .run(
+            taskOps -> {
+              TableMetadata base = taskOps.refresh();
+              TableMetadata updated = internalApply(base);
+              taskOps.commit(base, updated);
+            });
   }
 
   private TableMetadata internalApply(TableMetadata base) {
diff --git a/core/src/test/java/org/apache/iceberg/TestSetStatistics.java 
b/core/src/test/java/org/apache/iceberg/TestSetStatistics.java
index e51614f45b..68336e44dc 100644
--- a/core/src/test/java/org/apache/iceberg/TestSetStatistics.java
+++ b/core/src/test/java/org/apache/iceberg/TestSetStatistics.java
@@ -106,4 +106,54 @@ public class TestSetStatistics extends TestBase {
     assertThat(version()).isEqualTo(3);
     assertThat(metadata.statisticsFiles()).isEmpty();
   }
+
+  @TestTemplate
+  public void setStatisticsRetryWithConcurrentModification() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    long snapshotId = readMetadata().currentSnapshot().snapshotId();
+
+    GenericStatisticsFile statisticsFile =
+        new GenericStatisticsFile(
+            snapshotId, "/some/statistics/file.puffin", 100, 42, 
ImmutableList.of());
+
+    // Create a TableOperations that simulates concurrent modification
+    // On the first commit attempt, another writer modifies the table
+    TableOperations concurrentOps =
+        new TestTables.TestTableOperations("test", tableDir, table.ops().io()) 
{
+          private boolean firstAttempt = true;
+
+          @Override
+          public void commit(TableMetadata base, TableMetadata metadata) {
+            if (firstAttempt) {
+              firstAttempt = false;
+              table.newFastAppend().appendFile(FILE_B).commit();
+            }
+
+            super.commit(base, metadata);
+          }
+        };
+
+    SetStatistics setStats = new SetStatistics(concurrentOps);
+    setStats.setStatistics(statisticsFile);
+    setStats.commit();
+
+    
assertThat(readMetadata().statisticsFiles()).containsExactly(statisticsFile);
+  }
+
+  @TestTemplate
+  public void setStatisticsRetrySuccess() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    long snapshotId = readMetadata().currentSnapshot().snapshotId();
+
+    GenericStatisticsFile statisticsFile =
+        new GenericStatisticsFile(
+            snapshotId, "/some/statistics/file.puffin", 100, 42, 
ImmutableList.of());
+
+    TestTables.TestTableOperations ops = table.ops();
+    ops.failCommits(2);
+
+    table.updateStatistics().setStatistics(statisticsFile).commit();
+
+    
assertThat(readMetadata().statisticsFiles()).containsExactly(statisticsFile);
+  }
 }

Reply via email to