This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3eaeeae3fe Spark 3.1, 3.2, 3.3: Allow importing empty tables (#7980)
(#8063)
3eaeeae3fe is described below
commit 3eaeeae3fe8aa76902be34719a3b0dfe27c8cffb
Author: Rui Li <[email protected]>
AuthorDate: Sat Jul 15 03:29:42 2023 +0800
Spark 3.1, 3.2, 3.3: Allow importing empty tables (#7980) (#8063)
---
.../extensions/TestMigrateTableProcedure.java | 22 ++++++++++++++++++++++
.../org/apache/iceberg/spark/SparkTableUtil.java | 12 ++++++------
.../extensions/TestMigrateTableProcedure.java | 22 ++++++++++++++++++++++
.../org/apache/iceberg/spark/SparkTableUtil.java | 12 ++++++------
.../extensions/TestMigrateTableProcedure.java | 22 ++++++++++++++++++++++
.../org/apache/iceberg/spark/SparkTableUtil.java | 12 ++++++------
6 files changed, 84 insertions(+), 18 deletions(-)
diff --git
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 919a513133..d1adee74f2 100644
---
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -201,4 +201,26 @@ public class TestMigrateTableProcedure extends
SparkExtensionsTestBase {
ImmutableList.of(row(1L, "2023/05/30",
java.sql.Date.valueOf("2023-05-30"))),
sql("SELECT * FROM %s ORDER BY id", tableName));
}
+
+ @Test
+ public void testMigrateEmptyPartitionedTable() throws Exception {
+ Assume.assumeTrue(catalogName.equals("spark_catalog"));
+ String location = temp.newFolder().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
PARTITIONED BY (id) LOCATION '%s'",
+ tableName, location);
+ Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName,
tableName);
+ Assert.assertEquals(0L, result);
+ }
+
+ @Test
+ public void testMigrateEmptyTable() throws Exception {
+ Assume.assumeTrue(catalogName.equals("spark_catalog"));
+ String location = temp.newFolder().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
LOCATION '%s'",
+ tableName, location);
+ Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName,
tableName);
+ Assert.assertEquals(0L, result);
+ }
}
diff --git
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index c1216f47ba..ca19a982c9 100644
---
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -446,12 +446,12 @@ public class SparkTableUtil {
} else {
List<SparkPartition> sourceTablePartitions =
getPartitions(spark, sourceTableIdent, partitionFilter);
- Preconditions.checkArgument(
- !sourceTablePartitions.isEmpty(),
- "Cannot find any partitions in table %s",
- sourceTableIdent);
- importSparkPartitions(
- spark, sourceTablePartitions, targetTable, spec, stagingDir,
checkDuplicateFiles);
+ if (sourceTablePartitions.isEmpty()) {
+ targetTable.newAppend().commit();
+ } else {
+ importSparkPartitions(
+ spark, sourceTablePartitions, targetTable, spec, stagingDir,
checkDuplicateFiles);
+ }
}
} catch (AnalysisException e) {
throw SparkExceptionUtil.toUncheckedException(
diff --git
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 919a513133..d1adee74f2 100644
---
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -201,4 +201,26 @@ public class TestMigrateTableProcedure extends
SparkExtensionsTestBase {
ImmutableList.of(row(1L, "2023/05/30",
java.sql.Date.valueOf("2023-05-30"))),
sql("SELECT * FROM %s ORDER BY id", tableName));
}
+
+ @Test
+ public void testMigrateEmptyPartitionedTable() throws Exception {
+ Assume.assumeTrue(catalogName.equals("spark_catalog"));
+ String location = temp.newFolder().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
PARTITIONED BY (id) LOCATION '%s'",
+ tableName, location);
+ Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName,
tableName);
+ Assert.assertEquals(0L, result);
+ }
+
+ @Test
+ public void testMigrateEmptyTable() throws Exception {
+ Assume.assumeTrue(catalogName.equals("spark_catalog"));
+ String location = temp.newFolder().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
LOCATION '%s'",
+ tableName, location);
+ Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName,
tableName);
+ Assert.assertEquals(0L, result);
+ }
}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index 4bfff5b2c4..89c69c8cf4 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -440,12 +440,12 @@ public class SparkTableUtil {
} else {
List<SparkPartition> sourceTablePartitions =
getPartitions(spark, sourceTableIdent, partitionFilter);
- Preconditions.checkArgument(
- !sourceTablePartitions.isEmpty(),
- "Cannot find any partitions in table %s",
- sourceTableIdent);
- importSparkPartitions(
- spark, sourceTablePartitions, targetTable, spec, stagingDir,
checkDuplicateFiles);
+ if (sourceTablePartitions.isEmpty()) {
+ targetTable.newAppend().commit();
+ } else {
+ importSparkPartitions(
+ spark, sourceTablePartitions, targetTable, spec, stagingDir,
checkDuplicateFiles);
+ }
}
} catch (AnalysisException e) {
throw SparkExceptionUtil.toUncheckedException(
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 49fee09408..b8b5df0990 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -201,4 +201,26 @@ public class TestMigrateTableProcedure extends
SparkExtensionsTestBase {
ImmutableList.of(row(1L, "2023/05/30",
java.sql.Date.valueOf("2023-05-30"))),
sql("SELECT * FROM %s ORDER BY id", tableName));
}
+
+ @Test
+ public void testMigrateEmptyPartitionedTable() throws Exception {
+ Assume.assumeTrue(catalogName.equals("spark_catalog"));
+ String location = temp.newFolder().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
PARTITIONED BY (id) LOCATION '%s'",
+ tableName, location);
+ Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName,
tableName);
+ Assert.assertEquals(0L, result);
+ }
+
+ @Test
+ public void testMigrateEmptyTable() throws Exception {
+ Assume.assumeTrue(catalogName.equals("spark_catalog"));
+ String location = temp.newFolder().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
LOCATION '%s'",
+ tableName, location);
+ Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName,
tableName);
+ Assert.assertEquals(0L, result);
+ }
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index 03255e6a7d..37f3f0c6c4 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -441,12 +441,12 @@ public class SparkTableUtil {
} else {
List<SparkPartition> sourceTablePartitions =
getPartitions(spark, sourceTableIdent, partitionFilter);
- Preconditions.checkArgument(
- !sourceTablePartitions.isEmpty(),
- "Cannot find any partitions in table %s",
- sourceTableIdent);
- importSparkPartitions(
- spark, sourceTablePartitions, targetTable, spec, stagingDir,
checkDuplicateFiles);
+ if (sourceTablePartitions.isEmpty()) {
+ targetTable.newAppend().commit();
+ } else {
+ importSparkPartitions(
+ spark, sourceTablePartitions, targetTable, spec, stagingDir,
checkDuplicateFiles);
+ }
}
} catch (AnalysisException e) {
throw SparkExceptionUtil.toUncheckedException(