This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 989a1acc01 Spark 3.3 : Backport `fast_forward` procedure (#8288)
989a1acc01 is described below
commit 989a1acc01d0570c99e055d3a5604c695789ec74
Author: Rakesh Das <[email protected]>
AuthorDate: Mon Aug 14 12:42:15 2023 +0530
Spark 3.3 : Backport `fast_forward` procedure (#8288)
---
.../extensions/TestFastForwardBranchProcedure.java | 191 +++++++++++++++++++++
.../procedures/FastForwardBranchProcedure.java | 94 ++++++++++
.../iceberg/spark/procedures/SparkProcedures.java | 1 +
3 files changed, 286 insertions(+)
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
new file mode 100644
index 0000000000..37d40957bc
--- /dev/null
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestFastForwardBranchProcedure extends SparkExtensionsTestBase {
+ public TestFastForwardBranchProcedure(
+ String catalogName, String implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testFastForwardBranchUsingPositionalArgs() {
+ sql("CREATE TABLE %s (id int NOT NULL, data string) USING iceberg",
tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ table.refresh();
+
+ Snapshot currSnapshot = table.currentSnapshot();
+ long sourceRef = currSnapshot.snapshotId();
+
+ String newBranch = "testBranch";
+ String tableNameWithBranch = String.format("%s.branch_%s", tableName,
newBranch);
+
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, newBranch);
+ sql("INSERT INTO TABLE %s VALUES(3,'c')", tableNameWithBranch);
+
+ table.refresh();
+ long updatedRef = table.snapshot(newBranch).snapshotId();
+
+ assertEquals(
+ "Main branch should not have the newly inserted record.",
+ ImmutableList.of(row(1, "a"), row(2, "b")),
+ sql("SELECT * FROM %s order by id", tableName));
+
+ assertEquals(
+ "Test branch should have the newly inserted record.",
+ ImmutableList.of(row(1, "a"), row(2, "b"), row(3, "c")),
+ sql("SELECT * FROM %s order by id", tableNameWithBranch));
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.fast_forward('%s', '%s', '%s')",
+ catalogName, tableIdent, SnapshotRef.MAIN_BRANCH, newBranch);
+
+
assertThat(Arrays.stream(output.get(0)).collect(Collectors.toList()).get(0))
+ .isEqualTo(SnapshotRef.MAIN_BRANCH);
+
+
assertThat(Arrays.stream(output.get(0)).collect(Collectors.toList()).get(1))
+ .isEqualTo(sourceRef);
+
+
assertThat(Arrays.stream(output.get(0)).collect(Collectors.toList()).get(2))
+ .isEqualTo(updatedRef);
+
+ assertEquals(
+ "Main branch should have the newly inserted record.",
+ ImmutableList.of(row(1, "a"), row(2, "b"), row(3, "c")),
+ sql("SELECT * FROM %s order by id", tableName));
+ }
+
+ @Test
+ public void testFastForwardBranchUsingNamedArgs() {
+ sql("CREATE TABLE %s (id int NOT NULL, data string) USING iceberg",
tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ String newBranch = "testBranch";
+ String tableNameWithBranch = String.format("%s.branch_%s", tableName,
newBranch);
+
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, newBranch);
+ sql("INSERT INTO TABLE %s VALUES(3,'c')", tableNameWithBranch);
+
+ assertEquals(
+ "Main branch should not have the newly inserted record.",
+ ImmutableList.of(row(1, "a"), row(2, "b")),
+ sql("SELECT * FROM %s order by id", tableName));
+
+ assertEquals(
+ "Test branch should have the newly inserted record.",
+ ImmutableList.of(row(1, "a"), row(2, "b"), row(3, "c")),
+ sql("SELECT * FROM %s order by id", tableNameWithBranch));
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.fast_forward(table => '%s', branch => '%s', to =>
'%s')",
+ catalogName, tableIdent, SnapshotRef.MAIN_BRANCH, newBranch);
+
+ assertEquals(
+ "Main branch should now have the newly inserted record.",
+ ImmutableList.of(row(1, "a"), row(2, "b"), row(3, "c")),
+ sql("SELECT * FROM %s order by id", tableName));
+ }
+
+ @Test
+ public void testFastForwardWhenTargetIsNotAncestorFails() {
+ sql("CREATE TABLE %s (id int NOT NULL, data string) USING iceberg",
tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ String newBranch = "testBranch";
+ String tableNameWithBranch = String.format("%s.branch_%s", tableName,
newBranch);
+
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, newBranch);
+ sql("INSERT INTO TABLE %s VALUES(3,'c')", tableNameWithBranch);
+
+ assertEquals(
+ "Main branch should not have the newly inserted record.",
+ ImmutableList.of(row(1, "a"), row(2, "b")),
+ sql("SELECT * FROM %s order by id", tableName));
+
+ assertEquals(
+ "Test branch should have the newly inserted record.",
+ ImmutableList.of(row(1, "a"), row(2, "b"), row(3, "c")),
+ sql("SELECT * FROM %s order by id", tableNameWithBranch));
+
+ // Commit a snapshot on main to deviate the branches
+ sql("INSERT INTO TABLE %s VALUES (4, 'd')", tableName);
+
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.fast_forward(table => '%s', branch =>
'%s', to => '%s')",
+ catalogName, tableIdent, SnapshotRef.MAIN_BRANCH,
newBranch))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot fast-forward: main is not an ancestor of
testBranch");
+ }
+
+ @Test
+ public void testInvalidFastForwardBranchCases() {
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.fast_forward('test_table', branch =>
'main', to => 'newBranch')",
+ catalogName))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessage("Named and positional arguments cannot be mixed");
+
+ assertThatThrownBy(
+ () ->
+ sql("CALL %s.custom.fast_forward('test_table', 'main',
'newBranch')", catalogName))
+ .isInstanceOf(NoSuchProcedureException.class)
+ .hasMessage("Procedure custom.fast_forward not found");
+
+ assertThatThrownBy(() -> sql("CALL %s.system.fast_forward('test_table',
'main')", catalogName))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessage("Missing required parameters: [to]");
+
+ assertThatThrownBy(
+ () -> sql("CALL %s.system.fast_forward('', 'main', 'newBranch')",
catalogName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot handle an empty identifier for argument table");
+ }
+}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
new file mode 100644
index 0000000000..459cc01c46
--- /dev/null
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class FastForwardBranchProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", DataTypes.StringType),
+ ProcedureParameter.required("branch", DataTypes.StringType),
+ ProcedureParameter.required("to", DataTypes.StringType)
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("branch_updated", DataTypes.StringType, false,
Metadata.empty()),
+ new StructField("previous_ref", DataTypes.LongType, true,
Metadata.empty()),
+ new StructField("updated_ref", DataTypes.LongType, false,
Metadata.empty())
+ });
+
+ public static SparkProcedures.ProcedureBuilder builder() {
+ return new Builder<FastForwardBranchProcedure>() {
+ @Override
+ protected FastForwardBranchProcedure doBuild() {
+ return new FastForwardBranchProcedure(tableCatalog());
+ }
+ };
+ }
+
+ private FastForwardBranchProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+ String source = args.getString(1);
+ String target = args.getString(2);
+
+ return modifyIcebergTable(
+ tableIdent,
+ table -> {
+ long currentRef = table.currentSnapshot().snapshotId();
+ table.manageSnapshots().fastForwardBranch(source, target).commit();
+ long updatedRef = table.currentSnapshot().snapshotId();
+
+ InternalRow outputRow =
+ newInternalRow(UTF8String.fromString(source), currentRef,
updatedRef);
+ return new InternalRow[] {outputRow};
+ });
+ }
+
+ @Override
+ public String description() {
+ return "FastForwardBranchProcedure";
+ }
+}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index 7ebbb46c3d..b324cd4422 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -55,6 +55,7 @@ public class SparkProcedures {
mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
mapBuilder.put("create_changelog_view",
CreateChangelogViewProcedure::builder);
mapBuilder.put("rewrite_position_delete_files",
RewritePositionDeleteFilesProcedure::builder);
+ mapBuilder.put("fast_forward", FastForwardBranchProcedure::builder);
return mapBuilder.build();
}