This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 989a1acc01 Spark 3.3 : Backport `fast_forward` procedure (#8288)
989a1acc01 is described below

commit 989a1acc01d0570c99e055d3a5604c695789ec74
Author: Rakesh Das <[email protected]>
AuthorDate: Mon Aug 14 12:42:15 2023 +0530

    Spark 3.3 : Backport `fast_forward` procedure (#8288)
---
 .../extensions/TestFastForwardBranchProcedure.java | 191 +++++++++++++++++++++
 .../procedures/FastForwardBranchProcedure.java     |  94 ++++++++++
 .../iceberg/spark/procedures/SparkProcedures.java  |   1 +
 3 files changed, 286 insertions(+)

diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
new file mode 100644
index 0000000000..37d40957bc
--- /dev/null
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestFastForwardBranchProcedure extends SparkExtensionsTestBase {
+  public TestFastForwardBranchProcedure(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testFastForwardBranchUsingPositionalArgs() {
+    sql("CREATE TABLE %s (id int NOT NULL, data string) USING iceberg", 
tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    table.refresh();
+
+    Snapshot currSnapshot = table.currentSnapshot();
+    long sourceRef = currSnapshot.snapshotId();
+
+    String newBranch = "testBranch";
+    String tableNameWithBranch = String.format("%s.branch_%s", tableName, 
newBranch);
+
+    sql("ALTER TABLE %s CREATE BRANCH %s", tableName, newBranch);
+    sql("INSERT INTO TABLE %s VALUES(3,'c')", tableNameWithBranch);
+
+    table.refresh();
+    long updatedRef = table.snapshot(newBranch).snapshotId();
+
+    assertEquals(
+        "Main branch should not have the newly inserted record.",
+        ImmutableList.of(row(1, "a"), row(2, "b")),
+        sql("SELECT * FROM %s order by id", tableName));
+
+    assertEquals(
+        "Test branch should have the newly inserted record.",
+        ImmutableList.of(row(1, "a"), row(2, "b"), row(3, "c")),
+        sql("SELECT * FROM %s order by id", tableNameWithBranch));
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.fast_forward('%s', '%s', '%s')",
+            catalogName, tableIdent, SnapshotRef.MAIN_BRANCH, newBranch);
+
+    
assertThat(Arrays.stream(output.get(0)).collect(Collectors.toList()).get(0))
+        .isEqualTo(SnapshotRef.MAIN_BRANCH);
+
+    
assertThat(Arrays.stream(output.get(0)).collect(Collectors.toList()).get(1))
+        .isEqualTo(sourceRef);
+
+    
assertThat(Arrays.stream(output.get(0)).collect(Collectors.toList()).get(2))
+        .isEqualTo(updatedRef);
+
+    assertEquals(
+        "Main branch should have the newly inserted record.",
+        ImmutableList.of(row(1, "a"), row(2, "b"), row(3, "c")),
+        sql("SELECT * FROM %s order by id", tableName));
+  }
+
+  @Test
+  public void testFastForwardBranchUsingNamedArgs() {
+    sql("CREATE TABLE %s (id int NOT NULL, data string) USING iceberg", 
tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    String newBranch = "testBranch";
+    String tableNameWithBranch = String.format("%s.branch_%s", tableName, 
newBranch);
+
+    sql("ALTER TABLE %s CREATE BRANCH %s", tableName, newBranch);
+    sql("INSERT INTO TABLE %s VALUES(3,'c')", tableNameWithBranch);
+
+    assertEquals(
+        "Main branch should not have the newly inserted record.",
+        ImmutableList.of(row(1, "a"), row(2, "b")),
+        sql("SELECT * FROM %s order by id", tableName));
+
+    assertEquals(
+        "Test branch should have the newly inserted record.",
+        ImmutableList.of(row(1, "a"), row(2, "b"), row(3, "c")),
+        sql("SELECT * FROM %s order by id", tableNameWithBranch));
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.fast_forward(table => '%s', branch => '%s', to => 
'%s')",
+            catalogName, tableIdent, SnapshotRef.MAIN_BRANCH, newBranch);
+
+    assertEquals(
+        "Main branch should now have the newly inserted record.",
+        ImmutableList.of(row(1, "a"), row(2, "b"), row(3, "c")),
+        sql("SELECT * FROM %s order by id", tableName));
+  }
+
+  @Test
+  public void testFastForwardWhenTargetIsNotAncestorFails() {
+    sql("CREATE TABLE %s (id int NOT NULL, data string) USING iceberg", 
tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    String newBranch = "testBranch";
+    String tableNameWithBranch = String.format("%s.branch_%s", tableName, 
newBranch);
+
+    sql("ALTER TABLE %s CREATE BRANCH %s", tableName, newBranch);
+    sql("INSERT INTO TABLE %s VALUES(3,'c')", tableNameWithBranch);
+
+    assertEquals(
+        "Main branch should not have the newly inserted record.",
+        ImmutableList.of(row(1, "a"), row(2, "b")),
+        sql("SELECT * FROM %s order by id", tableName));
+
+    assertEquals(
+        "Test branch should have the newly inserted record.",
+        ImmutableList.of(row(1, "a"), row(2, "b"), row(3, "c")),
+        sql("SELECT * FROM %s order by id", tableNameWithBranch));
+
+    // Commit a snapshot on main to deviate the branches
+    sql("INSERT INTO TABLE %s VALUES (4, 'd')", tableName);
+
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.fast_forward(table => '%s', branch => 
'%s', to => '%s')",
+                    catalogName, tableIdent, SnapshotRef.MAIN_BRANCH, 
newBranch))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot fast-forward: main is not an ancestor of 
testBranch");
+  }
+
+  @Test
+  public void testInvalidFastForwardBranchCases() {
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.fast_forward('test_table', branch => 
'main', to => 'newBranch')",
+                    catalogName))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessage("Named and positional arguments cannot be mixed");
+
+    assertThatThrownBy(
+            () ->
+                sql("CALL %s.custom.fast_forward('test_table', 'main', 
'newBranch')", catalogName))
+        .isInstanceOf(NoSuchProcedureException.class)
+        .hasMessage("Procedure custom.fast_forward not found");
+
+    assertThatThrownBy(() -> sql("CALL %s.system.fast_forward('test_table', 
'main')", catalogName))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessage("Missing required parameters: [to]");
+
+    assertThatThrownBy(
+            () -> sql("CALL %s.system.fast_forward('', 'main', 'newBranch')", 
catalogName))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot handle an empty identifier for argument table");
+  }
+}
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
new file mode 100644
index 0000000000..459cc01c46
--- /dev/null
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class FastForwardBranchProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.required("branch", DataTypes.StringType),
+        ProcedureParameter.required("to", DataTypes.StringType)
+      };
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("branch_updated", DataTypes.StringType, false, 
Metadata.empty()),
+            new StructField("previous_ref", DataTypes.LongType, true, 
Metadata.empty()),
+            new StructField("updated_ref", DataTypes.LongType, false, 
Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new Builder<FastForwardBranchProcedure>() {
+      @Override
+      protected FastForwardBranchProcedure doBuild() {
+        return new FastForwardBranchProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private FastForwardBranchProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+    String source = args.getString(1);
+    String target = args.getString(2);
+
+    return modifyIcebergTable(
+        tableIdent,
+        table -> {
+          long currentRef = table.currentSnapshot().snapshotId();
+          table.manageSnapshots().fastForwardBranch(source, target).commit();
+          long updatedRef = table.currentSnapshot().snapshotId();
+
+          InternalRow outputRow =
+              newInternalRow(UTF8String.fromString(source), currentRef, 
updatedRef);
+          return new InternalRow[] {outputRow};
+        });
+  }
+
+  @Override
+  public String description() {
+    return "FastForwardBranchProcedure";
+  }
+}
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index 7ebbb46c3d..b324cd4422 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -55,6 +55,7 @@ public class SparkProcedures {
     mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
     mapBuilder.put("create_changelog_view", 
CreateChangelogViewProcedure::builder);
     mapBuilder.put("rewrite_position_delete_files", 
RewritePositionDeleteFilesProcedure::builder);
+    mapBuilder.put("fast_forward", FastForwardBranchProcedure::builder);
     return mapBuilder.build();
   }
 

Reply via email to