This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new cf358a4d15 Spark 3.4: Support setting current snapshot with ref (#8163)
cf358a4d15 is described below

commit cf358a4d150a3c7dcef9d1bccb09986a938aa128
Author: Manu Zhang <[email protected]>
AuthorDate: Wed Aug 16 07:44:05 2023 +0800

    Spark 3.4: Support setting current snapshot with ref (#8163)
---
 docs/spark-procedures.md                           | 10 +++-
 .../TestSetCurrentSnapshotProcedure.java           | 65 ++++++++++++++++++++--
 .../procedures/SetCurrentSnapshotProcedure.java    | 24 ++++++--
 3 files changed, 88 insertions(+), 11 deletions(-)

diff --git a/docs/spark-procedures.md b/docs/spark-procedures.md
index 59fa39dc93..debf393a30 100644
--- a/docs/spark-procedures.md
+++ b/docs/spark-procedures.md
@@ -130,7 +130,10 @@ This procedure invalidates all cached Spark plans that 
reference the affected ta
 | Argument Name | Required? | Type | Description |
 |---------------|-----------|------|-------------|
 | `table`       | ✔️  | string | Name of the table to update |
-| `snapshot_id` | ✔️  | long   | Snapshot ID to set as current |
+| `snapshot_id` | | long   | Snapshot ID to set as current |
+| `ref` | | string | Snapshot Referece (branch or tag) to set as current |
+
+Either `snapshot_id` or `ref` must be provided but not both.
 
 #### Output
 
@@ -146,6 +149,11 @@ Set the current snapshot for `db.sample` to 1:
 CALL catalog_name.system.set_current_snapshot('db.sample', 1)
 ```
 
+Set the current snapshot for `db.sample` to tag `s1`:
+```sql
+CALL catalog_name.system.set_current_snapshot(table => 'db.sample', tag => 
's1');
+```
+
 ### `cherrypick_snapshot`
 
 Cherry-picks changes from a snapshot into the current table state.
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java
index 51db8d3210..e894ba4ff0 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java
@@ -212,12 +212,12 @@ public class TestSetCurrentSnapshotProcedure extends 
SparkExtensionsTestBase {
 
     Assertions.assertThatThrownBy(
             () -> sql("CALL %s.system.set_current_snapshot('t')", catalogName))
-        .isInstanceOf(AnalysisException.class)
-        .hasMessage("Missing required parameters: [snapshot_id]");
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Either snapshot_id or ref must be provided, not both");
 
     Assertions.assertThatThrownBy(() -> sql("CALL 
%s.system.set_current_snapshot(1L)", catalogName))
-        .isInstanceOf(AnalysisException.class)
-        .hasMessage("Missing required parameters: [snapshot_id]");
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse identifier for arg table: 1");
 
     Assertions.assertThatThrownBy(
             () -> sql("CALL %s.system.set_current_snapshot(snapshot_id => 
1L)", catalogName))
@@ -226,8 +226,8 @@ public class TestSetCurrentSnapshotProcedure extends 
SparkExtensionsTestBase {
 
     Assertions.assertThatThrownBy(
             () -> sql("CALL %s.system.set_current_snapshot(table => 't')", 
catalogName))
-        .isInstanceOf(AnalysisException.class)
-        .hasMessage("Missing required parameters: [snapshot_id]");
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Either snapshot_id or ref must be provided, not both");
 
     Assertions.assertThatThrownBy(
             () -> sql("CALL %s.system.set_current_snapshot('t', 2.2)", 
catalogName))
@@ -238,5 +238,58 @@ public class TestSetCurrentSnapshotProcedure extends 
SparkExtensionsTestBase {
             () -> sql("CALL %s.system.set_current_snapshot('', 1L)", 
catalogName))
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("Cannot handle an empty identifier for argument table");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.set_current_snapshot(table => 't', 
snapshot_id => 1L, ref => 's1')",
+                    catalogName))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Either snapshot_id or ref must be provided, not both");
+  }
+
+  @Test
+  public void testSetCurrentSnapshotToRef() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot firstSnapshot = table.currentSnapshot();
+    String ref = "s1";
+    sql("ALTER TABLE %s CREATE TAG %s", tableName, ref);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    table.refresh();
+
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.set_current_snapshot(table => '%s', ref => '%s')",
+            catalogName, tableIdent, ref);
+
+    assertEquals(
+        "Procedure output must match",
+        ImmutableList.of(row(secondSnapshot.snapshotId(), 
firstSnapshot.snapshotId())),
+        output);
+
+    assertEquals(
+        "Set must be successful",
+        ImmutableList.of(row(1L, "a")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    String notExistRef = "s2";
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.set_current_snapshot(table => '%s', ref => 
'%s')",
+                    catalogName, tableIdent, notExistRef))
+        .isInstanceOf(ValidationException.class)
+        .hasMessage("Cannot find matching snapshot ID for ref " + notExistRef);
   }
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java
index f8f8049c22..22719e43c0 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java
@@ -19,6 +19,10 @@
 package org.apache.iceberg.spark.procedures;
 
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
@@ -42,7 +46,8 @@ class SetCurrentSnapshotProcedure extends BaseProcedure {
   private static final ProcedureParameter[] PARAMETERS =
       new ProcedureParameter[] {
         ProcedureParameter.required("table", DataTypes.StringType),
-        ProcedureParameter.required("snapshot_id", DataTypes.LongType)
+        ProcedureParameter.optional("snapshot_id", DataTypes.LongType),
+        ProcedureParameter.optional("ref", DataTypes.StringType)
       };
 
   private static final StructType OUTPUT_TYPE =
@@ -78,7 +83,11 @@ class SetCurrentSnapshotProcedure extends BaseProcedure {
   @Override
   public InternalRow[] call(InternalRow args) {
     Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-    long snapshotId = args.getLong(1);
+    Long snapshotId = args.isNullAt(1) ? null : args.getLong(1);
+    String ref = args.isNullAt(2) ? null : args.getString(2);
+    Preconditions.checkArgument(
+        (snapshotId != null && ref == null) || (snapshotId == null && ref != 
null),
+        "Either snapshot_id or ref must be provided, not both");
 
     return modifyIcebergTable(
         tableIdent,
@@ -86,9 +95,10 @@ class SetCurrentSnapshotProcedure extends BaseProcedure {
           Snapshot previousSnapshot = table.currentSnapshot();
           Long previousSnapshotId = previousSnapshot != null ? 
previousSnapshot.snapshotId() : null;
 
-          table.manageSnapshots().setCurrentSnapshot(snapshotId).commit();
+          long targetSnapshotId = snapshotId != null ? snapshotId : 
toSnapshotId(table, ref);
+          
table.manageSnapshots().setCurrentSnapshot(targetSnapshotId).commit();
 
-          InternalRow outputRow = newInternalRow(previousSnapshotId, 
snapshotId);
+          InternalRow outputRow = newInternalRow(previousSnapshotId, 
targetSnapshotId);
           return new InternalRow[] {outputRow};
         });
   }
@@ -97,4 +107,10 @@ class SetCurrentSnapshotProcedure extends BaseProcedure {
   public String description() {
     return "SetCurrentSnapshotProcedure";
   }
+
+  private long toSnapshotId(Table table, String refName) {
+    SnapshotRef ref = table.refs().get(refName);
+    ValidationException.check(ref != null, "Cannot find matching snapshot ID 
for ref " + refName);
+    return ref.snapshotId();
+  }
 }

Reply via email to