This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f98c6411da API, Spark: Supports create branch on empty table (#8072)
f98c6411da is described below
commit f98c6411da7e71604d66c729f7c6d8d0fc9764db
Author: Xianyang Liu <[email protected]>
AuthorDate: Tue Aug 1 10:06:08 2023 +0800
API, Spark: Supports create branch on empty table (#8072)
Co-authored-by: xianyangliu <[email protected]>
---
.../java/org/apache/iceberg/ManageSnapshots.java | 13 ++++++
.../java/org/apache/iceberg/SnapshotManager.java | 12 ++++++
.../org/apache/iceberg/TestSnapshotManager.java | 32 +++++++++++++++
.../datasources/v2/CreateOrReplaceBranchExec.scala | 17 +++++---
.../iceberg/spark/extensions/TestBranchDDL.java | 48 +++++++++++++++++++---
5 files changed, 112 insertions(+), 10 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/ManageSnapshots.java
b/api/src/main/java/org/apache/iceberg/ManageSnapshots.java
index 81caf3a58d..2fa60472da 100644
--- a/api/src/main/java/org/apache/iceberg/ManageSnapshots.java
+++ b/api/src/main/java/org/apache/iceberg/ManageSnapshots.java
@@ -83,6 +83,19 @@ public interface ManageSnapshots extends
PendingUpdate<Snapshot> {
*/
ManageSnapshots cherrypick(long snapshotId);
+ /**
+ * Create a new branch. The branch will point to current snapshot if the
current snapshot is not
+ * NULL. Otherwise, the branch will point to a newly created empty snapshot.
+ *
+ * @param name branch name
+ * @return this for method chaining
+ * @throws IllegalArgumentException if a branch with the given name already
exists
+ */
+ default ManageSnapshots createBranch(String name) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " doesn't implement createBranch(String)");
+ }
+
/**
* Create a new branch pointing to the given snapshot id.
*
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotManager.java
b/core/src/main/java/org/apache/iceberg/SnapshotManager.java
index f9015c04b8..75ccad0177 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotManager.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotManager.java
@@ -68,6 +68,18 @@ public class SnapshotManager implements ManageSnapshots {
return this;
}
+ @Override
+ public ManageSnapshots createBranch(String name) {
+ Snapshot currentSnapshot = transaction.currentMetadata().currentSnapshot();
+ if (currentSnapshot != null) {
+ return createBranch(name, currentSnapshot.snapshotId());
+ }
+
+ // Create an empty snapshot for the branch
+ transaction.newFastAppend().toBranch(name).commit();
+ return this;
+ }
+
@Override
public ManageSnapshots createBranch(String name, long snapshotId) {
updateSnapshotReferencesOperation().createBranch(name, snapshotId);
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java
b/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java
index ba4e85f494..67a383583e 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java
@@ -230,6 +230,38 @@ public class TestSnapshotManager extends TableTestBase {
&&
expectedBranch.equals(SnapshotRef.branchBuilder(snapshotId).build()));
}
+ @Test
+ public void testCreateBranchWithoutSnapshotId() {
+ table.newAppend().appendFile(FILE_A).commit();
+ long snapshotId = table.currentSnapshot().snapshotId();
+ // Test a basic case of creating a branch
+ table.manageSnapshots().createBranch("branch1").commit();
+ SnapshotRef actualBranch = table.ops().refresh().ref("branch1");
+ Assertions.assertThat(actualBranch).isNotNull();
+
Assertions.assertThat(actualBranch).isEqualTo(SnapshotRef.branchBuilder(snapshotId).build());
+ }
+
+ @Test
+ public void testCreateBranchOnEmptyTable() {
+ table.manageSnapshots().createBranch("branch1").commit();
+
+ SnapshotRef mainSnapshotRef =
table.ops().refresh().ref(SnapshotRef.MAIN_BRANCH);
+ Assertions.assertThat(mainSnapshotRef).isNull();
+
+ SnapshotRef branch1SnapshotRef = table.ops().refresh().ref("branch1");
+ Assertions.assertThat(branch1SnapshotRef).isNotNull();
+ Assertions.assertThat(branch1SnapshotRef.minSnapshotsToKeep()).isNull();
+ Assertions.assertThat(branch1SnapshotRef.maxSnapshotAgeMs()).isNull();
+ Assertions.assertThat(branch1SnapshotRef.maxRefAgeMs()).isNull();
+
+ Snapshot snapshot = table.snapshot(branch1SnapshotRef.snapshotId());
+ Assertions.assertThat(snapshot.parentId()).isNull();
+ Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty();
+ }
+
@Test
public void testCreateBranchFailsWhenRefAlreadyExists() {
table.newAppend().appendFile(FILE_A).commit();
diff --git
a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
index d4328d4b92..2be406e7f3 100644
---
a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
+++
b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
@@ -48,22 +48,29 @@ case class CreateOrReplaceBranchExec(
.map(java.lang.Long.valueOf)
.orNull
- Preconditions.checkArgument(snapshotId != null,
- "Cannot complete create or replace branch operation on %s, main has
no snapshot", ident)
-
val manageSnapshots = iceberg.table().manageSnapshots()
val refExists = null != iceberg.table().refs().get(branch)
+ def safeCreateBranch(): Unit = {
+ if (snapshotId == null) {
+ manageSnapshots.createBranch(branch)
+ } else {
+ manageSnapshots.createBranch(branch, snapshotId)
+ }
+ }
+
if (create && replace && !refExists) {
- manageSnapshots.createBranch(branch, snapshotId)
+ safeCreateBranch()
} else if (replace) {
+ Preconditions.checkArgument(snapshotId != null,
+ "Cannot complete replace branch operation on %s, main has no
snapshot", ident)
manageSnapshots.replaceBranch(branch, snapshotId)
} else {
if (refExists && ifNotExists) {
return Nil
}
- manageSnapshots.createBranch(branch, snapshotId)
+ safeCreateBranch()
}
if (branchOptions.numSnapshots.nonEmpty) {
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
index 0f00603bb4..a6bf194b3d 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -92,11 +93,25 @@ public class TestBranchDDL extends SparkExtensionsTestBase {
@Test
public void testCreateBranchOnEmptyTable() {
- Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE BRANCH %s",
tableName, "b1"))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining(
- "Cannot complete create or replace branch operation on %s, main
has no snapshot",
- tableName);
+ String branchName = "b1";
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1");
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH);
+ Assertions.assertThat(mainRef).isNull();
+
+ SnapshotRef ref = table.refs().get(branchName);
+ Assertions.assertThat(ref).isNotNull();
+ Assertions.assertThat(ref.minSnapshotsToKeep()).isNull();
+ Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull();
+ Assertions.assertThat(ref.maxRefAgeMs()).isNull();
+
+ Snapshot snapshot = table.snapshot(ref.snapshotId());
+ Assertions.assertThat(snapshot.parentId()).isNull();
+ Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty();
}
@Test
@@ -308,6 +323,29 @@ public class TestBranchDDL extends SparkExtensionsTestBase
{
assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(second);
}
+ @Test
+ public void testCreateOrReplaceBranchOnEmptyTable() {
+ String branchName = "b1";
+ sql("ALTER TABLE %s CREATE OR REPLACE BRANCH %s", tableName, "b1");
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH);
+ Assertions.assertThat(mainRef).isNull();
+
+ SnapshotRef ref = table.refs().get(branchName);
+ Assertions.assertThat(ref).isNotNull();
+ Assertions.assertThat(ref.minSnapshotsToKeep()).isNull();
+ Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull();
+ Assertions.assertThat(ref.maxRefAgeMs()).isNull();
+
+ Snapshot snapshot = table.snapshot(ref.snapshotId());
+ Assertions.assertThat(snapshot.parentId()).isNull();
+ Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty();
+ }
+
@Test
public void createOrReplaceWithNonExistingBranch() throws
NoSuchTableException {
Table table = insertRows();