This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 8da6f3e551 Spark 3.1, 3.2, 3.3: Supports create branch on empty table
(#8317)
8da6f3e551 is described below
commit 8da6f3e551e43d146658ec48afb16f958f564ecf
Author: Xianyang Liu <[email protected]>
AuthorDate: Tue Aug 15 23:33:03 2023 +0800
Spark 3.1, 3.2, 3.3: Supports create branch on empty table (#8317)
This change backports PR #8072 to Spark 3.1, 3.2, 3.3.
Co-authored-by: xianyangliu <[email protected]>
---
.../datasources/v2/CreateOrReplaceBranchExec.scala | 17 +++++---
.../iceberg/spark/extensions/TestBranchDDL.java | 48 +++++++++++++++++++---
.../datasources/v2/CreateOrReplaceBranchExec.scala | 17 +++++---
.../iceberg/spark/extensions/TestBranchDDL.java | 48 +++++++++++++++++++---
.../datasources/v2/CreateOrReplaceBranchExec.scala | 17 +++++---
.../iceberg/spark/extensions/TestBranchDDL.java | 48 +++++++++++++++++++---
6 files changed, 165 insertions(+), 30 deletions(-)
diff --git
a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
index 651b5c62e1..2ca586838c 100644
---
a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
+++
b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
@@ -48,22 +48,29 @@ case class CreateOrReplaceBranchExec(
.map(java.lang.Long.valueOf)
.orNull
- Preconditions.checkArgument(snapshotId != null,
- "Cannot complete create or replace branch operation on %s, main has
no snapshot", ident)
-
val manageSnapshots = iceberg.table().manageSnapshots()
val refExists = null != iceberg.table().refs().get(branch)
+ def safeCreateBranch(): Unit = {
+ if (snapshotId == null) {
+ manageSnapshots.createBranch(branch)
+ } else {
+ manageSnapshots.createBranch(branch, snapshotId)
+ }
+ }
+
if (create && replace && !refExists) {
- manageSnapshots.createBranch(branch, snapshotId)
+ safeCreateBranch()
} else if (replace) {
+ Preconditions.checkArgument(snapshotId != null,
+ "Cannot complete replace branch operation on %s, main has no
snapshot", ident)
manageSnapshots.replaceBranch(branch, snapshotId)
} else {
if (refExists && ifNotExists) {
return Nil
}
- manageSnapshots.createBranch(branch, snapshotId)
+ safeCreateBranch()
}
if (branchOptions.numSnapshots.nonEmpty) {
diff --git
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
index f23dc58af6..914ee40329 100644
---
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
+++
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -100,11 +101,25 @@ public class TestBranchDDL extends
SparkExtensionsTestBase {
@Test
public void testCreateBranchOnEmptyTable() {
- Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE BRANCH %s",
tableName, "b1"))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining(
- "Cannot complete create or replace branch operation on %s, main
has no snapshot",
- tableName);
+ String branchName = "b1";
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1");
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH);
+ Assertions.assertThat(mainRef).isNull();
+
+ SnapshotRef ref = table.refs().get(branchName);
+ Assertions.assertThat(ref).isNotNull();
+ Assertions.assertThat(ref.minSnapshotsToKeep()).isNull();
+ Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull();
+ Assertions.assertThat(ref.maxRefAgeMs()).isNull();
+
+ Snapshot snapshot = table.snapshot(ref.snapshotId());
+ Assertions.assertThat(snapshot.parentId()).isNull();
+ Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty();
}
@Test
@@ -529,6 +544,29 @@ public class TestBranchDDL extends SparkExtensionsTestBase
{
assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(second);
}
+ @Test
+ public void testCreateOrReplaceBranchOnEmptyTable() {
+ String branchName = "b1";
+ sql("ALTER TABLE %s CREATE OR REPLACE BRANCH %s", tableName, "b1");
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH);
+ Assertions.assertThat(mainRef).isNull();
+
+ SnapshotRef ref = table.refs().get(branchName);
+ Assertions.assertThat(ref).isNotNull();
+ Assertions.assertThat(ref.minSnapshotsToKeep()).isNull();
+ Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull();
+ Assertions.assertThat(ref.maxRefAgeMs()).isNull();
+
+ Snapshot snapshot = table.snapshot(ref.snapshotId());
+ Assertions.assertThat(snapshot.parentId()).isNull();
+ Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty();
+ }
+
@Test
public void createOrReplaceWithNonExistingBranch() throws
NoSuchTableException {
Table table = insertRows();
diff --git
a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
index 142ed13571..ecf1489e08 100644
---
a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
+++
b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
@@ -48,22 +48,29 @@ case class CreateOrReplaceBranchExec(
.map(java.lang.Long.valueOf)
.orNull
- Preconditions.checkArgument(snapshotId != null,
- "Cannot complete create or replace branch operation on %s, main has
no snapshot", ident)
-
val manageSnapshots = iceberg.table().manageSnapshots()
val refExists = null != iceberg.table().refs().get(branch)
+ def safeCreateBranch(): Unit = {
+ if (snapshotId == null) {
+ manageSnapshots.createBranch(branch)
+ } else {
+ manageSnapshots.createBranch(branch, snapshotId)
+ }
+ }
+
if (create && replace && !refExists) {
- manageSnapshots.createBranch(branch, snapshotId)
+ safeCreateBranch()
} else if (replace) {
+ Preconditions.checkArgument(snapshotId != null,
+ "Cannot complete replace branch operation on %s, main has no
snapshot", ident)
manageSnapshots.replaceBranch(branch, snapshotId)
} else {
if (refExists && ifNotExists) {
return Nil
}
- manageSnapshots.createBranch(branch, snapshotId)
+ safeCreateBranch()
}
if (branchOptions.numSnapshots.nonEmpty) {
diff --git
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
index 2c3cbac028..adf82d01cd 100644
---
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
+++
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -101,11 +102,25 @@ public class TestBranchDDL extends
SparkExtensionsTestBase {
@Test
public void testCreateBranchOnEmptyTable() {
- Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE BRANCH %s",
tableName, "b1"))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining(
- "Cannot complete create or replace branch operation on %s, main
has no snapshot",
- tableName);
+ String branchName = "b1";
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1");
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH);
+ Assertions.assertThat(mainRef).isNull();
+
+ SnapshotRef ref = table.refs().get(branchName);
+ Assertions.assertThat(ref).isNotNull();
+ Assertions.assertThat(ref.minSnapshotsToKeep()).isNull();
+ Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull();
+ Assertions.assertThat(ref.maxRefAgeMs()).isNull();
+
+ Snapshot snapshot = table.snapshot(ref.snapshotId());
+ Assertions.assertThat(snapshot.parentId()).isNull();
+ Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty();
}
@Test
@@ -530,6 +545,29 @@ public class TestBranchDDL extends SparkExtensionsTestBase
{
assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(second);
}
+ @Test
+ public void testCreateOrReplaceBranchOnEmptyTable() {
+ String branchName = "b1";
+ sql("ALTER TABLE %s CREATE OR REPLACE BRANCH %s", tableName, "b1");
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH);
+ Assertions.assertThat(mainRef).isNull();
+
+ SnapshotRef ref = table.refs().get(branchName);
+ Assertions.assertThat(ref).isNotNull();
+ Assertions.assertThat(ref.minSnapshotsToKeep()).isNull();
+ Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull();
+ Assertions.assertThat(ref.maxRefAgeMs()).isNull();
+
+ Snapshot snapshot = table.snapshot(ref.snapshotId());
+ Assertions.assertThat(snapshot.parentId()).isNull();
+ Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty();
+ }
+
@Test
public void createOrReplaceWithNonExistingBranch() throws
NoSuchTableException {
Table table = insertRows();
diff --git
a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
index d4328d4b92..2be406e7f3 100644
---
a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
+++
b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
@@ -48,22 +48,29 @@ case class CreateOrReplaceBranchExec(
.map(java.lang.Long.valueOf)
.orNull
- Preconditions.checkArgument(snapshotId != null,
- "Cannot complete create or replace branch operation on %s, main has
no snapshot", ident)
-
val manageSnapshots = iceberg.table().manageSnapshots()
val refExists = null != iceberg.table().refs().get(branch)
+ def safeCreateBranch(): Unit = {
+ if (snapshotId == null) {
+ manageSnapshots.createBranch(branch)
+ } else {
+ manageSnapshots.createBranch(branch, snapshotId)
+ }
+ }
+
if (create && replace && !refExists) {
- manageSnapshots.createBranch(branch, snapshotId)
+ safeCreateBranch()
} else if (replace) {
+ Preconditions.checkArgument(snapshotId != null,
+ "Cannot complete replace branch operation on %s, main has no
snapshot", ident)
manageSnapshots.replaceBranch(branch, snapshotId)
} else {
if (refExists && ifNotExists) {
return Nil
}
- manageSnapshots.createBranch(branch, snapshotId)
+ safeCreateBranch()
}
if (branchOptions.numSnapshots.nonEmpty) {
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
index fcc124dee5..e6ac4cfc4d 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -94,11 +95,25 @@ public class TestBranchDDL extends SparkExtensionsTestBase {
@Test
public void testCreateBranchOnEmptyTable() {
- Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE BRANCH %s",
tableName, "b1"))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining(
- "Cannot complete create or replace branch operation on %s, main
has no snapshot",
- tableName);
+ String branchName = "b1";
+ sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1");
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH);
+ Assertions.assertThat(mainRef).isNull();
+
+ SnapshotRef ref = table.refs().get(branchName);
+ Assertions.assertThat(ref).isNotNull();
+ Assertions.assertThat(ref.minSnapshotsToKeep()).isNull();
+ Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull();
+ Assertions.assertThat(ref.maxRefAgeMs()).isNull();
+
+ Snapshot snapshot = table.snapshot(ref.snapshotId());
+ Assertions.assertThat(snapshot.parentId()).isNull();
+ Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty();
}
@Test
@@ -310,6 +325,29 @@ public class TestBranchDDL extends SparkExtensionsTestBase
{
assertThat(table.refs().get(branchName).snapshotId()).isEqualTo(second);
}
+ @Test
+ public void testCreateOrReplaceBranchOnEmptyTable() {
+ String branchName = "b1";
+ sql("ALTER TABLE %s CREATE OR REPLACE BRANCH %s", tableName, "b1");
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ SnapshotRef mainRef = table.refs().get(SnapshotRef.MAIN_BRANCH);
+ Assertions.assertThat(mainRef).isNull();
+
+ SnapshotRef ref = table.refs().get(branchName);
+ Assertions.assertThat(ref).isNotNull();
+ Assertions.assertThat(ref.minSnapshotsToKeep()).isNull();
+ Assertions.assertThat(ref.maxSnapshotAgeMs()).isNull();
+ Assertions.assertThat(ref.maxRefAgeMs()).isNull();
+
+ Snapshot snapshot = table.snapshot(ref.snapshotId());
+ Assertions.assertThat(snapshot.parentId()).isNull();
+ Assertions.assertThat(snapshot.addedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDataFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.addedDeleteFiles(table.io())).isEmpty();
+ Assertions.assertThat(snapshot.removedDeleteFiles(table.io())).isEmpty();
+ }
+
@Test
public void createOrReplaceWithNonExistingBranch() throws
NoSuchTableException {
Table table = insertRows();