This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d2551a6670 Add SnapshotUpdateValidator to validate snapshots on commit
(#14509)
d2551a6670 is described below
commit d2551a6670efabec958eb97c4bc28ffbba587633
Author: Daniel Weeks <[email protected]>
AuthorDate: Thu Nov 6 17:28:27 2025 -0800
Add SnapshotUpdateValidator to validate snapshots on commit (#14509)
* Add support for snapshot ancestry validation with SnapshotUpdate
---
.../apache/iceberg/SnapshotAncestryValidator.java | 54 ++++++++++++++
.../java/org/apache/iceberg/SnapshotUpdate.java | 5 ++
.../java/org/apache/iceberg/SnapshotProducer.java | 34 ++++++++-
.../org/apache/iceberg/TestSnapshotProducer.java | 86 +++++++++++++++++++++-
4 files changed, 177 insertions(+), 2 deletions(-)
diff --git
a/api/src/main/java/org/apache/iceberg/SnapshotAncestryValidator.java
b/api/src/main/java/org/apache/iceberg/SnapshotAncestryValidator.java
new file mode 100644
index 0000000000..64b579a1a3
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/SnapshotAncestryValidator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.function.Function;
+import javax.annotation.Nonnull;
+
+/**
+ * Interface to support validating snapshot ancestry during the commit process.
+ *
+ * <p>Validation will be called after the table metadata is refreshed to pick
up any changes to the
+ * table state.
+ */
+@FunctionalInterface
+public interface SnapshotAncestryValidator extends
Function<Iterable<Snapshot>, Boolean> {
+
+ SnapshotAncestryValidator NON_VALIDATING = baseSnapshots -> true;
+
+ /**
+ * Validate the snapshots based on the refreshed table state.
+ *
+ * @param baseSnapshots ancestry of the base table metadata snapshots
+ * @return boolean for whether the update is valid
+ */
+ @Override
+ Boolean apply(Iterable<Snapshot> baseSnapshots);
+
+ /**
+ * Validation message that will be included when throwing {@link
+ * org.apache.iceberg.exceptions.ValidationException}
+ *
+ * @return message
+ */
+ @Nonnull
+ default String errorMessage() {
+ return "error message not provided";
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
index cc6b02dee4..73509c1538 100644
--- a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
+++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
@@ -71,4 +71,9 @@ public interface SnapshotUpdate<ThisT> extends
PendingUpdate<Snapshot> {
"Cannot commit to branch %s: %s does not support branch commits",
branch, this.getClass().getName()));
}
+
+ default ThisT validateWith(SnapshotAncestryValidator validator) {
+ throw new UnsupportedOperationException(
+ "Snapshot validation not supported by " + this.getClass().getName());
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index d11f466434..ce02637d98 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.metrics.CommitMetrics;
import org.apache.iceberg.metrics.CommitMetricsResult;
@@ -117,6 +118,8 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
private TableMetadata base;
private boolean stageOnly = false;
private Consumer<String> deleteFunc = defaultDelete;
+ private SnapshotAncestryValidator snapshotAncestryValidator =
+ SnapshotAncestryValidator.NON_VALIDATING;
private ExecutorService workerPool;
private String targetBranch = SnapshotRef.MAIN_BRANCH;
@@ -159,6 +162,20 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
return self();
}
+ /**
+ * Set a validator to check snapshot ancestry before committing changes.
+ *
+ * <p>If there is no parent snapshot, an empty iterable will be supplied to
the validator.
+ *
+ * @param validator a validator to check snapshot ancestry validity
+ * @return this for method chaining
+ */
+ @Override
+ public ThisT validateWith(SnapshotAncestryValidator validator) {
+ this.snapshotAncestryValidator = validator;
+ return self();
+ }
+
protected TableOperations ops() {
return ops;
}
@@ -257,7 +274,8 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
long sequenceNumber = base.nextSequenceNumber();
Long parentSnapshotId = parentSnapshot == null ? null :
parentSnapshot.snapshotId();
- validate(base, parentSnapshot);
+ runValidations(parentSnapshot);
+
List<ManifestFile> manifests = apply(base, parentSnapshot);
OutputFile manifestList = manifestListPath();
@@ -327,6 +345,20 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
writer.toManifestListFile().encryptionKeyID());
}
+ private void runValidations(Snapshot parentSnapshot) {
+ validate(base, parentSnapshot);
+
+ // Validate snapshot ancestry
+ Iterable<Snapshot> snapshotAncestry =
+ parentSnapshot != null
+ ? SnapshotUtil.ancestorsOf(parentSnapshot.snapshotId(),
base::snapshot)
+ : List.of();
+
+ boolean valid = snapshotAncestryValidator.apply(snapshotAncestry);
+ ValidationException.check(
+ valid, "Snapshot ancestry validation failed: %s",
snapshotAncestryValidator.errorMessage());
+ }
+
protected abstract Map<String, String> summary();
/** Returns the snapshot summary from the implementation and updates totals.
*/
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
index c3e238e3bc..956242f66e 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
@@ -18,11 +18,22 @@
*/
package org.apache.iceberg;
+import static org.apache.iceberg.SnapshotSummary.PUBLISHED_WAP_ID_PROP;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
-public class TestSnapshotProducer {
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestSnapshotProducer extends TestBase {
@Test
public void testManifestFileGroupSize() {
@@ -74,4 +85,77 @@ public class TestSnapshotProducer {
int writerCount = SnapshotProducer.manifestWriterCount(workerPoolSize,
fileCount);
assertThat(writerCount).as(errMsg).isEqualTo(expectedManifestWriterCount);
}
+
+ @TestTemplate
+ public void testCommitValidationPreventsCommit() throws IOException {
+ table.newAppend().commit();
+ String validationMessage = "Validation force failed";
+
+ // Create a CommitValidator that will reject commits
+ SnapshotAncestryValidator validator =
+ new SnapshotAncestryValidator() {
+ @Override
+ public Boolean apply(Iterable<Snapshot> baseSnapshots) {
+ return false;
+ }
+
+ @Nonnull
+ @Override
+ public String errorMessage() {
+ return validationMessage;
+ }
+ };
+
+ // Test that the validator rejects commit
+ AppendFiles append1 =
table.newAppend().validateWith(validator).appendFile(FILE_A);
+ assertThatThrownBy(append1::commit)
+ .isInstanceOf(ValidationException.class)
+ .hasMessage("Snapshot ancestry validation failed: " +
validationMessage);
+
+ // Verify the file was not committed
+ assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(0);
+ }
+
+ @TestTemplate
+ public void testCommitValidationWithCustomSummaryProperties() throws
IOException {
+ String wapId = "wap-12345-staging-audit";
+
+ // Create a validator that checks custom summary properties
+ SnapshotAncestryValidator customPropertyValidator =
+ baseSnapshots -> {
+ List<String> publishedWapIds =
+ Streams.stream(baseSnapshots)
+ .filter(snapshot ->
snapshot.summary().containsKey(PUBLISHED_WAP_ID_PROP))
+ .map(snapshot ->
snapshot.summary().get(PUBLISHED_WAP_ID_PROP))
+ .collect(Collectors.toList());
+
+ return !publishedWapIds.contains(wapId);
+ };
+
+ // Add a file with and set a published WAP id
+ table
+ .newFastAppend()
+ .validateWith(customPropertyValidator)
+ .appendFile(FILE_A)
+ .set(PUBLISHED_WAP_ID_PROP, wapId)
+ .commit();
+
+ // Verify the current state of the table
+
assertThat(table.currentSnapshot().summary().get(PUBLISHED_WAP_ID_PROP)).isEqualTo(wapId);
+
+ // Attempt to add the same published WAP id
+ AppendFiles append2 =
+ table
+ .newFastAppend()
+ .validateWith(customPropertyValidator)
+ .appendFile(FILE_A)
+ .set(PUBLISHED_WAP_ID_PROP, wapId);
+
+ assertThatThrownBy(append2::commit)
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("Snapshot ancestry validation failed");
+
+ // Verify the table wasn't updated
+ assertThat(table.snapshots()).hasSize(1);
+ }
}