This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 70b7aa534b API, Core, Spark: Change behavior of fastForward/replace to
create the from branch if it does not exist (#9196)
70b7aa534b is described below
commit 70b7aa534b2c79ccd7b6c0e0fd1be980772bb20a
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Mon Jan 22 14:56:43 2024 -0800
API, Core, Spark: Change behavior of fastForward/replace to create the from
branch if it does not exist (#9196)
---
.../java/org/apache/iceberg/ManageSnapshots.java | 6 ++-
.../iceberg/UpdateSnapshotReferencesOperation.java | 6 ++-
.../org/apache/iceberg/TestSnapshotManager.java | 35 ++++++++------
.../extensions/TestFastForwardBranchProcedure.java | 54 +++++++++++++---------
.../procedures/FastForwardBranchProcedure.java | 17 +++----
5 files changed, 68 insertions(+), 50 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/ManageSnapshots.java
b/api/src/main/java/org/apache/iceberg/ManageSnapshots.java
index 986bbb6f58..12cd5021fa 100644
--- a/api/src/main/java/org/apache/iceberg/ManageSnapshots.java
+++ b/api/src/main/java/org/apache/iceberg/ManageSnapshots.java
@@ -164,7 +164,8 @@ public interface ManageSnapshots extends
PendingUpdate<Snapshot> {
/**
* Replaces the {@code from} branch to point to the {@code to} snapshot. The
{@code to} will
- * remain unchanged, and {@code from} branch will retain its retention
properties.
+ * remain unchanged, and {@code from} branch will retain its retention
properties. If the {@code
+ * from} branch does not exist, it will be created with default retention
properties.
*
* @param from Branch to replace
* @param to The branch {@code from} should be replaced with
@@ -175,7 +176,8 @@ public interface ManageSnapshots extends
PendingUpdate<Snapshot> {
/**
* Performs a fast-forward of {@code from} up to the {@code to} snapshot if
{@code from} is an
* ancestor of {@code to}. The {@code to} will remain unchanged, and {@code
from} will retain its
- * retention properties.
+ * retention properties. If the {@code from} branch does not exist, it will
be created with
+ * default retention properties.
*
* @param from Branch to fast-forward
* @param to Ref for the {@code from} branch to be fast forwarded to
diff --git
a/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java
b/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java
index 2c3c6c1f7e..9d15bf0ee2 100644
---
a/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java
+++
b/core/src/main/java/org/apache/iceberg/UpdateSnapshotReferencesOperation.java
@@ -120,9 +120,11 @@ class UpdateSnapshotReferencesOperation implements
PendingUpdate<Map<String, Sna
Preconditions.checkNotNull(to, "Destination ref cannot be null");
SnapshotRef branchToUpdate = updatedRefs.get(from);
SnapshotRef toRef = updatedRefs.get(to);
- Preconditions.checkArgument(
- branchToUpdate != null, "Branch to update does not exist: %s", from);
Preconditions.checkArgument(toRef != null, "Ref does not exist: %s", to);
+ if (branchToUpdate == null) {
+ return createBranch(from, toRef.snapshotId());
+ }
+
Preconditions.checkArgument(branchToUpdate.isBranch(), "Ref %s is a tag
not a branch", from);
// Nothing to replace
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java
b/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java
index d561d697d3..fd22ae24d0 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java
@@ -408,14 +408,6 @@ public class TestSnapshotManager extends TableTestBase {
table.ops().refresh().ref("branch1").snapshotId(),
secondSnapshot.snapshotId());
}
- @Test
- public void testReplaceBranchNonExistingBranchToUpdateFails() {
- Assertions.assertThatThrownBy(
- () -> table.manageSnapshots().replaceBranch("non-existing",
"other-branch").commit())
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Branch to update does not exist: non-existing");
- }
-
@Test
public void testReplaceBranchNonExistingToBranchFails() {
table.newAppend().appendFile(FILE_A).commit();
@@ -428,12 +420,27 @@ public class TestSnapshotManager extends TableTestBase {
}
@Test
- public void testFastForwardBranchNonExistingFromBranchFails() {
- Assertions.assertThatThrownBy(
- () ->
- table.manageSnapshots().fastForwardBranch("non-existing",
"other-branch").commit())
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Branch to update does not exist: non-existing");
+ public void testFastForwardBranchNonExistingFromBranchCreatesTheBranch() {
+ table.newAppend().appendFile(FILE_A).commit();
+ long snapshotId = table.currentSnapshot().snapshotId();
+ table.manageSnapshots().createBranch("branch1", snapshotId).commit();
+ table.manageSnapshots().fastForwardBranch("new-branch",
"branch1").commit();
+
+
Assertions.assertThat(table.ops().current().ref("new-branch").isBranch()).isTrue();
+ Assertions.assertThat(table.ops().current().ref("new-branch").snapshotId())
+ .isEqualTo(snapshotId);
+ }
+
+ @Test
+ public void testReplaceBranchNonExistingFromBranchCreatesTheBranch() {
+ table.newAppend().appendFile(FILE_A).commit();
+ long snapshotId = table.currentSnapshot().snapshotId();
+ table.manageSnapshots().createBranch("branch1", snapshotId).commit();
+ table.manageSnapshots().replaceBranch("new-branch", "branch1").commit();
+
+
Assertions.assertThat(table.ops().current().ref("new-branch").isBranch()).isTrue();
+ Assertions.assertThat(table.ops().current().ref("new-branch").snapshotId())
+ .isEqualTo(snapshotId);
}
@Test
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
index 99bc862485..0c99c3e07f 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
@@ -190,21 +190,8 @@ public class TestFastForwardBranchProcedure extends
SparkExtensionsTestBase {
}
@Test
- public void testFastForwardNonExistingBranchCases() {
+ public void testFastForwardNonExistingToRefFails() {
sql("CREATE TABLE %s (id int NOT NULL, data string) USING iceberg",
tableName);
-
- Table table = validationCatalog.loadTable(tableIdent);
- table.refresh();
-
- assertThatThrownBy(
- () ->
- sql(
- "CALL %s.system.fast_forward(table => '%s', branch =>
'%s', to => '%s')",
- catalogName, tableIdent, "non_existing_branch",
SnapshotRef.MAIN_BRANCH))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Branch to fast-forward does not exist:
non_existing_branch");
-
- sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
assertThatThrownBy(
() ->
sql(
@@ -237,14 +224,37 @@ public class TestFastForwardBranchProcedure extends
SparkExtensionsTestBase {
sql("INSERT INTO TABLE %s VALUES (3, 'c')", tableNameWithBranch2);
table.refresh();
Snapshot branch2Snapshot = table.snapshot(branch2);
+ assertThat(
+ sql(
+ "CALL %s.system.fast_forward('%s', '%s', '%s')",
+ catalogName, tableIdent, branch1, branch2))
+ .containsExactly(row(branch1, branch1Snapshot.snapshotId(),
branch2Snapshot.snapshotId()));
+ }
- List<Object[]> output =
- sql(
- "CALL %s.system.fast_forward('%s', '%s', '%s')",
- catalogName, tableIdent, branch1, branch2);
- List<Object> outputRow =
Arrays.stream(output.get(0)).collect(Collectors.toList());
- assertThat(outputRow.get(0)).isEqualTo(branch1);
- assertThat(outputRow.get(1)).isEqualTo(branch1Snapshot.snapshotId());
- assertThat(outputRow.get(2)).isEqualTo(branch2Snapshot.snapshotId());
+ @Test
+ public void testFastForwardNonExistingFromMainCreatesBranch() {
+ sql("CREATE TABLE %s (id int NOT NULL, data string) USING iceberg",
tableName);
+ String branch1 = "branch1";
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branch1);
+ String branchIdentifier = String.format("%s.branch_%s", tableName,
branch1);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", branchIdentifier);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", branchIdentifier);
+ Table table = validationCatalog.loadTable(tableIdent);
+ table.refresh();
+ Snapshot branch1Snapshot = table.snapshot(branch1);
+
+ assertThat(
+ sql(
+ "CALL %s.system.fast_forward('%s', '%s', '%s')",
+ catalogName, tableIdent, SnapshotRef.MAIN_BRANCH, branch1))
+ .containsExactly(row(SnapshotRef.MAIN_BRANCH, null,
branch1Snapshot.snapshotId()));
+
+ // Ensure the same behavior for non-main branches
+ String branch2 = "branch2";
+ assertThat(
+ sql(
+ "CALL %s.system.fast_forward('%s', '%s', '%s')",
+ catalogName, tableIdent, branch2, branch1))
+ .containsExactly(row(branch2, null, branch1Snapshot.snapshotId()));
}
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
index 83908f284b..11ea5d44c9 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
@@ -18,8 +18,6 @@
*/
package org.apache.iceberg.spark.procedures;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -73,19 +71,18 @@ public class FastForwardBranchProcedure extends
BaseProcedure {
@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
- String source = args.getString(1);
- String target = args.getString(2);
+ String from = args.getString(1);
+ String to = args.getString(2);
return modifyIcebergTable(
tableIdent,
table -> {
- Snapshot currentSnapshot = table.snapshot(source);
- Preconditions.checkArgument(
- currentSnapshot != null, "Branch to fast-forward does not exist:
%s", source);
- table.manageSnapshots().fastForwardBranch(source, target).commit();
- long latest = table.snapshot(source).snapshotId();
+ Long snapshotBefore =
+ table.snapshot(from) != null ? table.snapshot(from).snapshotId()
: null;
+ table.manageSnapshots().fastForwardBranch(from, to).commit();
+ long snapshotAfter = table.snapshot(from).snapshotId();
InternalRow outputRow =
- newInternalRow(UTF8String.fromString(source),
currentSnapshot.snapshotId(), latest);
+ newInternalRow(UTF8String.fromString(from), snapshotBefore,
snapshotAfter);
return new InternalRow[] {outputRow};
});
}