This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f98c6411da API, Spark: Supports create branch on empty table (#8072)
f98c6411da is described below

commit f98c6411da7e71604d66c729f7c6d8d0fc9764db
Author: Xianyang Liu <[email protected]>
AuthorDate: Tue Aug 1 10:06:08 2023 +0800

    API, Spark: Supports create branch on empty table (#8072)
    
    Co-authored-by: xianyangliu <[email protected]>
---
 .../java/org/apache/iceberg/ManageSnapshots.java   | 13 ++++++
 .../java/org/apache/iceberg/SnapshotManager.java   | 12 ++++++
 .../org/apache/iceberg/TestSnapshotManager.java    | 32 +++++++++++++++
 .../datasources/v2/CreateOrReplaceBranchExec.scala | 17 +++++---
 .../iceberg/spark/extensions/TestBranchDDL.java    | 48 +++++++++++++++++++---
 5 files changed, 112 insertions(+), 10 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/ManageSnapshots.java 
b/api/src/main/java/org/apache/iceberg/ManageSnapshots.java
index 81caf3a58d..2fa60472da 100644
--- a/api/src/main/java/org/apache/iceberg/ManageSnapshots.java
+++ b/api/src/main/java/org/apache/iceberg/ManageSnapshots.java
@@ -83,6 +83,19 @@ public interface ManageSnapshots extends 
PendingUpdate<Snapshot> {
    */
   ManageSnapshots cherrypick(long snapshotId);
 
+  /**
+   * Create a new branch. The branch will point to current snapshot if the 
current snapshot is not
+   * NULL. Otherwise, the branch will point to a newly created empty snapshot.
+   *
+   * @param name branch name
+   * @return this for method chaining
+   * @throws IllegalArgumentException if a branch with the given name already 
exists
+   */
+  default ManageSnapshots createBranch(String name) {
+    throw new UnsupportedOperationException(
+        this.getClass().getName() + " doesn't implement createBranch(String)");
+  }
+
   /**
    * Create a new branch pointing to the given snapshot id.
    *
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotManager.java 
b/core/src/main/java/org/apache/iceberg/SnapshotManager.java
index f9015c04b8..75ccad0177 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotManager.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotManager.java
@@ -68,6 +68,18 @@ public class SnapshotManager implements ManageSnapshots {
     return this;
   }
 
+  @Override
+  public ManageSnapshots createBranch(String name) {
+    Snapshot currentSnapshot = transaction.currentMetadata().currentSnapshot();
+    if (currentSnapshot != null) {
+      return createBranch(name, currentSnapshot.snapshotId());
+    }
+
+    // Create an empty snapshot for the branch
+    transaction.newFastAppend().toBranch(name).commit();
+    return this;
+  }
+
   @Override
   public ManageSnapshots createBranch(String name, long snapshotId) {
     updateSnapshotReferencesOperation().createBranch(name, snapshotId);
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java 
b/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java
index ba4e85f494..67a383583e 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotManager.java
@@ -230,6 +230,38 @@ public class TestSnapshotManager extends TableTestBase {
             && 
expectedBranch.equals(SnapshotRef.branchBuilder(snapshotId).build()));
   }
 
+  @Test
+  public void testCreateBranchWithoutSnapshotId() {
+    table.newAppend().appendFile(FILE_A).commit();
+    long snapshotId = table.currentSnapshot().snapshotId();
+    // Test a basic case of creating a branch
+    table.manageSnapshots().createBranch("branch1").commit();
+    SnapshotRef actualBranch = table.ops().refresh().ref("branch1");
+    Assertions.assertThat(actualBranch).isNotNull();
+    
Assertions.assertThat(actualBranch).isEqualTo(SnapshotRef.branchBuilder(snapshotId).build());
+  }
+
+  @Test
+  public void testCreateBranchOnEmptyTable() {
+    table.manageSnapshots().createBranch("branch1").commit();
+
+    SnapshotRef mainSnapshotRef = 
table.ops().refresh().ref(SnapshotRef.MAIN_BRANCH);
+    Assertions.assertThat(mainSnapshotRef).isNull();
+
+    SnapshotRef branch1SnapshotRef = table.ops().refresh().ref("branch1");
+    Assertions.assertThat(branch1SnapshotRef).isNotNull();
+    Assertions.assertThat(branch1SnapshotRef.minSnapshotsToKeep()).isNull();
+    Assertions.assertThat(branch1SnapshotRef.maxSnapshotAgeMs()).isNull();
+    Assertions.assertThat(branch1SnapshotRef.maxRefAgeMs()).isNull();
+
+    Snapshot snapshot = table.snapshot(branch1SnapshotRef.snapshotId());
+    Assertions.assertThat(snapshot.parentId()).isNull();
+    Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty();
+    Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty();
+    Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty();
+    Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty();
+  }
+
   @Test
   public void testCreateBranchFailsWhenRefAlreadyExists() {
     table.newAppend().appendFile(FILE_A).commit();
diff --git 
a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
 
b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
index d4328d4b92..2be406e7f3 100644
--- 
a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
+++ 
b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
@@ -48,22 +48,29 @@ case class CreateOrReplaceBranchExec(
           .map(java.lang.Long.valueOf)
           .orNull
 
-        Preconditions.checkArgument(snapshotId != null,
-          "Cannot complete create or replace branch operation on %s, main has 
no snapshot", ident)
-
         val manageSnapshots = iceberg.table().manageSnapshots()
         val refExists = null != iceberg.table().refs().get(branch)
 
+        def safeCreateBranch(): Unit = {
+          if (snapshotId == null) {
+            manageSnapshots.createBranch(branch)
+          } else {
+            manageSnapshots.createBranch(branch, snapshotId)
+          }
+        }
+
         if (create && replace && !refExists) {
-          manageSnapshots.createBranch(branch, snapshotId)
+          safeCreateBranch()
         } else if (replace) {
+          Preconditions.checkArgument(snapshotId != null,
+            "Cannot complete replace branch operation on %s, main has no 
snapshot", ident)
           manageSnapshots.replaceBranch(branch, snapshotId)
         } else {
           if (refExists && ifNotExists) {
             return Nil
           }
 
-          manageSnapshots.createBranch(branch, snapshotId)
+          safeCreateBranch()
         }
 
         if (branchOptions.numSnapshots.nonEmpty) {
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
index 0f00603bb4..a6bf194b3d 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -92,11 +93,25 @@ public class TestBranchDDL extends SparkExtensionsTestBase {
 
   @Test
   public void testCreateBranchOnEmptyTable() {
-    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE BRANCH %s", 
tableName, "b1"))
-        .isInstanceOf(IllegalArgumentException.class)
-        .hasMessageContaining(
-            "Cannot complete create or replace branch operation on %s, main 
has no snapshot",
-            tableName);
+    String branchName = "b1";
+    sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1");
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH);
+    Assertions.assertThat(mainRef).isNull();
+
+    SnapshotRef ref = table.refs().get(branchName);
+    Assertions.assertThat(ref).isNotNull();
+    Assertions.assertThat(ref.minSnapshotsToKeep()).isNull();
+    Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull();
+    Assertions.assertThat(ref.maxRefAgeMs()).isNull();
+
+    Snapshot snapshot = table.snapshot(ref.snapshotId());
+    Assertions.assertThat(snapshot.parentId()).isNull();
+    Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty();
+    Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty();
+    Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty();
+    Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty();
   }
 
   @Test
@@ -308,6 +323,29 @@ public class TestBranchDDL extends SparkExtensionsTestBase 
{
     assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(second);
   }
 
+  @Test
+  public void testCreateOrReplaceBranchOnEmptyTable() {
+    String branchName = "b1";
+    sql("ALTER TABLE %s CREATE OR REPLACE BRANCH %s", tableName, "b1");
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH);
+    Assertions.assertThat(mainRef).isNull();
+
+    SnapshotRef ref = table.refs().get(branchName);
+    Assertions.assertThat(ref).isNotNull();
+    Assertions.assertThat(ref.minSnapshotsToKeep()).isNull();
+    Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull();
+    Assertions.assertThat(ref.maxRefAgeMs()).isNull();
+
+    Snapshot snapshot = table.snapshot(ref.snapshotId());
+    Assertions.assertThat(snapshot.parentId()).isNull();
+    Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty();
+    Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty();
+    Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty();
+    Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty();
+  }
+
   @Test
   public void createOrReplaceWithNonExistingBranch() throws 
NoSuchTableException {
     Table table = insertRows();

Reply via email to