This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new bfc52d218a Core: Fix compute_table_stats failures with concurrent
writes (#15148)
bfc52d218a is described below
commit bfc52d218adf9fc0229601055c2238f580bcac3a
Author: hemanthboyina <[email protected]>
AuthorDate: Fri Feb 6 00:28:02 2026 +0530
Core: Fix compute_table_stats failures with concurrent writes (#15148)
* Core: Fix compute_table_stats failures with concurrent writes
* fixed test failures and moved tests to TestSetStatistics
* updated the test variables with configured value
* updated and removed the changes of base and item
* updated concurrent modification test
* updated test naming conventions and nit
---
.../java/org/apache/iceberg/SetStatistics.java | 30 +++++++++++--
.../java/org/apache/iceberg/TestSetStatistics.java | 50 ++++++++++++++++++++++
2 files changed, 77 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/SetStatistics.java
b/core/src/main/java/org/apache/iceberg/SetStatistics.java
index ceb3fe91ba..01e06fa16b 100644
--- a/core/src/main/java/org/apache/iceberg/SetStatistics.java
+++ b/core/src/main/java/org/apache/iceberg/SetStatistics.java
@@ -18,12 +18,24 @@
*/
package org.apache.iceberg;
+import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
+import static
org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
+import static
org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
+import static
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Tasks;
public class SetStatistics implements UpdateStatistics {
+
private final TableOperations ops;
private final Map<Long, Optional<StatisticsFile>> statisticsToSet =
Maps.newHashMap();
@@ -50,9 +62,21 @@ public class SetStatistics implements UpdateStatistics {
@Override
public void commit() {
- TableMetadata base = ops.current();
- TableMetadata newMetadata = internalApply(base);
- ops.commit(base, newMetadata);
+ Tasks.foreach(ops)
+ .retry(ops.current().propertyAsInt(COMMIT_NUM_RETRIES,
COMMIT_NUM_RETRIES_DEFAULT))
+ .exponentialBackoff(
+ ops.current().propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS,
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ ops.current().propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS,
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ ops.current()
+ .propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS,
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */)
+ .onlyRetryOn(CommitFailedException.class)
+ .run(
+ taskOps -> {
+ TableMetadata base = taskOps.refresh();
+ TableMetadata updated = internalApply(base);
+ taskOps.commit(base, updated);
+ });
}
private TableMetadata internalApply(TableMetadata base) {
diff --git a/core/src/test/java/org/apache/iceberg/TestSetStatistics.java
b/core/src/test/java/org/apache/iceberg/TestSetStatistics.java
index e51614f45b..68336e44dc 100644
--- a/core/src/test/java/org/apache/iceberg/TestSetStatistics.java
+++ b/core/src/test/java/org/apache/iceberg/TestSetStatistics.java
@@ -106,4 +106,54 @@ public class TestSetStatistics extends TestBase {
assertThat(version()).isEqualTo(3);
assertThat(metadata.statisticsFiles()).isEmpty();
}
+
+ @TestTemplate
+ public void setStatisticsRetryWithConcurrentModification() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotId = readMetadata().currentSnapshot().snapshotId();
+
+ GenericStatisticsFile statisticsFile =
+ new GenericStatisticsFile(
+ snapshotId, "/some/statistics/file.puffin", 100, 42,
ImmutableList.of());
+
+ // Create a TableOperations that simulates concurrent modification
+ // On the first commit attempt, another writer modifies the table
+ TableOperations concurrentOps =
+ new TestTables.TestTableOperations("test", tableDir, table.ops().io())
{
+ private boolean firstAttempt = true;
+
+ @Override
+ public void commit(TableMetadata base, TableMetadata metadata) {
+ if (firstAttempt) {
+ firstAttempt = false;
+ table.newFastAppend().appendFile(FILE_B).commit();
+ }
+
+ super.commit(base, metadata);
+ }
+ };
+
+ SetStatistics setStats = new SetStatistics(concurrentOps);
+ setStats.setStatistics(statisticsFile);
+ setStats.commit();
+
+
assertThat(readMetadata().statisticsFiles()).containsExactly(statisticsFile);
+ }
+
+ @TestTemplate
+ public void setStatisticsRetrySuccess() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotId = readMetadata().currentSnapshot().snapshotId();
+
+ GenericStatisticsFile statisticsFile =
+ new GenericStatisticsFile(
+ snapshotId, "/some/statistics/file.puffin", 100, 42,
ImmutableList.of());
+
+ TestTables.TestTableOperations ops = table.ops();
+ ops.failCommits(2);
+
+ table.updateStatistics().setStatistics(statisticsFile).commit();
+
+
assertThat(readMetadata().statisticsFiles()).containsExactly(statisticsFile);
+ }
}