This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new a55d1235d2 Spark 4.1 | 4.0 | 3.5 | 3.4: Fail publish_changes procedure 
if there's more than one matching snapshot (#14955)
a55d1235d2 is described below

commit a55d1235d20a542b7de4cc60772dad172a414861
Author: Sam Wheating <[email protected]>
AuthorDate: Thu Jan 29 15:52:23 2026 -0800

    Spark 4.1 | 4.0 | 3.5 | 3.4: Fail publish_changes procedure if there's more 
than one matching snapshot (#14955)
    
    * Fail publish_changes procedure if there's multiple matching snapshots
    
    * rewrite publish_changes procedure to early-exit on duplicated wap.id
    
    * Update docs for publish_changes procedure
    
    * run spotlessApply
    
    * Update docs/docs/spark-procedures.md
    
    * backport fix to spark 3.4, 3.5, 4.0
---
 docs/docs/spark-procedures.md                      |  2 ++
 .../extensions/TestPublishChangesProcedure.java    | 20 ++++++++++++++++
 .../spark/procedures/PublishChangesProcedure.java  | 27 ++++++++++++----------
 .../extensions/TestPublishChangesProcedure.java    | 20 ++++++++++++++++
 .../spark/procedures/PublishChangesProcedure.java  | 27 ++++++++++++----------
 .../extensions/TestPublishChangesProcedure.java    | 20 ++++++++++++++++
 .../spark/procedures/PublishChangesProcedure.java  | 27 ++++++++++++----------
 .../extensions/TestPublishChangesProcedure.java    | 20 ++++++++++++++++
 .../spark/procedures/PublishChangesProcedure.java  | 27 ++++++++++++----------
 9 files changed, 142 insertions(+), 48 deletions(-)

diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md
index addcf87d78..c040f49ef8 100644
--- a/docs/docs/spark-procedures.md
+++ b/docs/docs/spark-procedures.md
@@ -189,6 +189,8 @@ publish_changes creates a new snapshot from an existing 
snapshot without alterin
 
 Only append and dynamic overwrite snapshots can be successfully published.
 
+The `publish_changes` procedure will fail if there are multiple snapshots in 
the table with the provided `wap_id`.
+
 !!! info
     This procedure invalidates all cached Spark plans that reference the 
affected table.
 
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
index 08f44c8f01..d9319801d1 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
@@ -161,6 +161,26 @@ public class TestPublishChangesProcedure extends 
ExtensionsTestBase {
         .hasMessage("Cannot apply unknown WAP ID 'not_valid'");
   }
 
+  @TestTemplate
+  public void testApplyDuplicateWapId() {
+
+    String wapId = "wap_id_1";
+
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, 
WRITE_AUDIT_PUBLISH_ENABLED);
+
+    spark.conf().set("spark.wap.id", wapId);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    assertThatThrownBy(
+            () -> sql("CALL %s.system.publish_changes('%s', '%s')", 
catalogName, tableIdent, wapId))
+        .isInstanceOf(ValidationException.class)
+        .hasMessage(
+            "Cannot apply non-unique WAP ID. Found multiple snapshots with WAP 
ID 'wap_id_1'");
+  }
+
   @TestTemplate
   public void testInvalidApplyWapChangesCases() {
     assertThatThrownBy(
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
index 2c3ce7418e..a47e754153 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
@@ -18,10 +18,8 @@
  */
 package org.apache.iceberg.spark.procedures;
 
-import java.util.Optional;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.iceberg.util.WapUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -92,21 +90,26 @@ class PublishChangesProcedure extends BaseProcedure {
     return modifyIcebergTable(
         tableIdent,
         table -> {
-          Optional<Snapshot> wapSnapshot =
-              Optional.ofNullable(
-                  Iterables.find(
-                      table.snapshots(),
-                      snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)),
-                      null));
-          if (!wapSnapshot.isPresent()) {
+          Snapshot matchingSnap = null;
+          for (Snapshot snap : table.snapshots()) {
+            if (wapId.equals(WapUtil.stagedWapId(snap))) {
+              if (matchingSnap != null) {
+                throw new ValidationException(
+                    "Cannot apply non-unique WAP ID. Found multiple snapshots 
with WAP ID '%s'",
+                    wapId);
+              } else {
+                matchingSnap = snap;
+              }
+            }
+          }
+
+          if (matchingSnap == null) {
             throw new ValidationException("Cannot apply unknown WAP ID '%s'", 
wapId);
           }
 
-          long wapSnapshotId = wapSnapshot.get().snapshotId();
+          long wapSnapshotId = matchingSnap.snapshotId();
           table.manageSnapshots().cherrypick(wapSnapshotId).commit();
-
           Snapshot currentSnapshot = table.currentSnapshot();
-
           InternalRow outputRow = newInternalRow(wapSnapshotId, 
currentSnapshot.snapshotId());
           return new InternalRow[] {outputRow};
         });
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
index 08f44c8f01..d9319801d1 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
@@ -161,6 +161,26 @@ public class TestPublishChangesProcedure extends 
ExtensionsTestBase {
         .hasMessage("Cannot apply unknown WAP ID 'not_valid'");
   }
 
+  @TestTemplate
+  public void testApplyDuplicateWapId() {
+
+    String wapId = "wap_id_1";
+
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, 
WRITE_AUDIT_PUBLISH_ENABLED);
+
+    spark.conf().set("spark.wap.id", wapId);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    assertThatThrownBy(
+            () -> sql("CALL %s.system.publish_changes('%s', '%s')", 
catalogName, tableIdent, wapId))
+        .isInstanceOf(ValidationException.class)
+        .hasMessage(
+            "Cannot apply non-unique WAP ID. Found multiple snapshots with WAP 
ID 'wap_id_1'");
+  }
+
   @TestTemplate
   public void testInvalidApplyWapChangesCases() {
     assertThatThrownBy(
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
index 2c3ce7418e..a47e754153 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
@@ -18,10 +18,8 @@
  */
 package org.apache.iceberg.spark.procedures;
 
-import java.util.Optional;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.iceberg.util.WapUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -92,21 +90,26 @@ class PublishChangesProcedure extends BaseProcedure {
     return modifyIcebergTable(
         tableIdent,
         table -> {
-          Optional<Snapshot> wapSnapshot =
-              Optional.ofNullable(
-                  Iterables.find(
-                      table.snapshots(),
-                      snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)),
-                      null));
-          if (!wapSnapshot.isPresent()) {
+          Snapshot matchingSnap = null;
+          for (Snapshot snap : table.snapshots()) {
+            if (wapId.equals(WapUtil.stagedWapId(snap))) {
+              if (matchingSnap != null) {
+                throw new ValidationException(
+                    "Cannot apply non-unique WAP ID. Found multiple snapshots 
with WAP ID '%s'",
+                    wapId);
+              } else {
+                matchingSnap = snap;
+              }
+            }
+          }
+
+          if (matchingSnap == null) {
             throw new ValidationException("Cannot apply unknown WAP ID '%s'", 
wapId);
           }
 
-          long wapSnapshotId = wapSnapshot.get().snapshotId();
+          long wapSnapshotId = matchingSnap.snapshotId();
           table.manageSnapshots().cherrypick(wapSnapshotId).commit();
-
           Snapshot currentSnapshot = table.currentSnapshot();
-
           InternalRow outputRow = newInternalRow(wapSnapshotId, 
currentSnapshot.snapshotId());
           return new InternalRow[] {outputRow};
         });
diff --git 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
index 4958fde15d..c72770e1ce 100644
--- 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
+++ 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
@@ -159,6 +159,26 @@ public class TestPublishChangesProcedure extends 
ExtensionsTestBase {
         .hasMessage("Cannot apply unknown WAP ID 'not_valid'");
   }
 
+  @TestTemplate
+  public void testApplyDuplicateWapId() {
+
+    String wapId = "wap_id_1";
+
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, 
WRITE_AUDIT_PUBLISH_ENABLED);
+
+    spark.conf().set("spark.wap.id", wapId);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    assertThatThrownBy(
+            () -> sql("CALL %s.system.publish_changes('%s', '%s')", 
catalogName, tableIdent, wapId))
+        .isInstanceOf(ValidationException.class)
+        .hasMessage(
+            "Cannot apply non-unique WAP ID. Found multiple snapshots with WAP 
ID 'wap_id_1'");
+  }
+
   @TestTemplate
   public void testInvalidApplyWapChangesCases() {
     assertThatThrownBy(
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
index 8748882043..8cb0a2bfb7 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
@@ -19,10 +19,8 @@
 package org.apache.iceberg.spark.procedures;
 
 import java.util.Iterator;
-import java.util.Optional;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.iceberg.util.WapUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -97,21 +95,26 @@ class PublishChangesProcedure extends BaseProcedure {
     return modifyIcebergTable(
         tableIdent,
         table -> {
-          Optional<Snapshot> wapSnapshot =
-              Optional.ofNullable(
-                  Iterables.find(
-                      table.snapshots(),
-                      snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)),
-                      null));
-          if (!wapSnapshot.isPresent()) {
+          Snapshot matchingSnap = null;
+          for (Snapshot snap : table.snapshots()) {
+            if (wapId.equals(WapUtil.stagedWapId(snap))) {
+              if (matchingSnap != null) {
+                throw new ValidationException(
+                    "Cannot apply non-unique WAP ID. Found multiple snapshots 
with WAP ID '%s'",
+                    wapId);
+              } else {
+                matchingSnap = snap;
+              }
+            }
+          }
+
+          if (matchingSnap == null) {
             throw new ValidationException("Cannot apply unknown WAP ID '%s'", 
wapId);
           }
 
-          long wapSnapshotId = wapSnapshot.get().snapshotId();
+          long wapSnapshotId = matchingSnap.snapshotId();
           table.manageSnapshots().cherrypick(wapSnapshotId).commit();
-
           Snapshot currentSnapshot = table.currentSnapshot();
-
           InternalRow outputRow = newInternalRow(wapSnapshotId, 
currentSnapshot.snapshotId());
           return asScanIterator(OUTPUT_TYPE, outputRow);
         });
diff --git 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
index 4958fde15d..c72770e1ce 100644
--- 
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
+++ 
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
@@ -159,6 +159,26 @@ public class TestPublishChangesProcedure extends 
ExtensionsTestBase {
         .hasMessage("Cannot apply unknown WAP ID 'not_valid'");
   }
 
+  @TestTemplate
+  public void testApplyDuplicateWapId() {
+
+    String wapId = "wap_id_1";
+
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, 
WRITE_AUDIT_PUBLISH_ENABLED);
+
+    spark.conf().set("spark.wap.id", wapId);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    assertThatThrownBy(
+            () -> sql("CALL %s.system.publish_changes('%s', '%s')", 
catalogName, tableIdent, wapId))
+        .isInstanceOf(ValidationException.class)
+        .hasMessage(
+            "Cannot apply non-unique WAP ID. Found multiple snapshots with WAP 
ID 'wap_id_1'");
+  }
+
   @TestTemplate
   public void testInvalidApplyWapChangesCases() {
     assertThatThrownBy(
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
index 8748882043..8cb0a2bfb7 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
@@ -19,10 +19,8 @@
 package org.apache.iceberg.spark.procedures;
 
 import java.util.Iterator;
-import java.util.Optional;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.iceberg.util.WapUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -97,21 +95,26 @@ class PublishChangesProcedure extends BaseProcedure {
     return modifyIcebergTable(
         tableIdent,
         table -> {
-          Optional<Snapshot> wapSnapshot =
-              Optional.ofNullable(
-                  Iterables.find(
-                      table.snapshots(),
-                      snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)),
-                      null));
-          if (!wapSnapshot.isPresent()) {
+          Snapshot matchingSnap = null;
+          for (Snapshot snap : table.snapshots()) {
+            if (wapId.equals(WapUtil.stagedWapId(snap))) {
+              if (matchingSnap != null) {
+                throw new ValidationException(
+                    "Cannot apply non-unique WAP ID. Found multiple snapshots 
with WAP ID '%s'",
+                    wapId);
+              } else {
+                matchingSnap = snap;
+              }
+            }
+          }
+
+          if (matchingSnap == null) {
             throw new ValidationException("Cannot apply unknown WAP ID '%s'", 
wapId);
           }
 
-          long wapSnapshotId = wapSnapshot.get().snapshotId();
+          long wapSnapshotId = matchingSnap.snapshotId();
           table.manageSnapshots().cherrypick(wapSnapshotId).commit();
-
           Snapshot currentSnapshot = table.currentSnapshot();
-
           InternalRow outputRow = newInternalRow(wapSnapshotId, 
currentSnapshot.snapshotId());
           return asScanIterator(OUTPUT_TYPE, outputRow);
         });

Reply via email to