This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3eaeeae3fe Spark 3.1, 3.2, 3.3: Allow importing empty tables (#7980) 
(#8063)
3eaeeae3fe is described below

commit 3eaeeae3fe8aa76902be34719a3b0dfe27c8cffb
Author: Rui Li <[email protected]>
AuthorDate: Sat Jul 15 03:29:42 2023 +0800

    Spark 3.1, 3.2, 3.3: Allow importing empty tables (#7980) (#8063)
---
 .../extensions/TestMigrateTableProcedure.java      | 22 ++++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkTableUtil.java   | 12 ++++++------
 .../extensions/TestMigrateTableProcedure.java      | 22 ++++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkTableUtil.java   | 12 ++++++------
 .../extensions/TestMigrateTableProcedure.java      | 22 ++++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkTableUtil.java   | 12 ++++++------
 6 files changed, 84 insertions(+), 18 deletions(-)

diff --git 
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
 
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 919a513133..d1adee74f2 100644
--- 
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++ 
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -201,4 +201,26 @@ public class TestMigrateTableProcedure extends 
SparkExtensionsTestBase {
         ImmutableList.of(row(1L, "2023/05/30", 
java.sql.Date.valueOf("2023-05-30"))),
         sql("SELECT * FROM %s ORDER BY id", tableName));
   }
+
+  @Test
+  public void testMigrateEmptyPartitionedTable() throws Exception {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet 
PARTITIONED BY (id) LOCATION '%s'",
+        tableName, location);
+    Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, 
tableName);
+    Assert.assertEquals(0L, result);
+  }
+
+  @Test
+  public void testMigrateEmptyTable() throws Exception {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet 
LOCATION '%s'",
+        tableName, location);
+    Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, 
tableName);
+    Assert.assertEquals(0L, result);
+  }
 }
diff --git 
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java 
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index c1216f47ba..ca19a982c9 100644
--- 
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ 
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -446,12 +446,12 @@ public class SparkTableUtil {
       } else {
         List<SparkPartition> sourceTablePartitions =
             getPartitions(spark, sourceTableIdent, partitionFilter);
-        Preconditions.checkArgument(
-            !sourceTablePartitions.isEmpty(),
-            "Cannot find any partitions in table %s",
-            sourceTableIdent);
-        importSparkPartitions(
-            spark, sourceTablePartitions, targetTable, spec, stagingDir, 
checkDuplicateFiles);
+        if (sourceTablePartitions.isEmpty()) {
+          targetTable.newAppend().commit();
+        } else {
+          importSparkPartitions(
+              spark, sourceTablePartitions, targetTable, spec, stagingDir, 
checkDuplicateFiles);
+        }
       }
     } catch (AnalysisException e) {
       throw SparkExceptionUtil.toUncheckedException(
diff --git 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 919a513133..d1adee74f2 100644
--- 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++ 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -201,4 +201,26 @@ public class TestMigrateTableProcedure extends 
SparkExtensionsTestBase {
         ImmutableList.of(row(1L, "2023/05/30", 
java.sql.Date.valueOf("2023-05-30"))),
         sql("SELECT * FROM %s ORDER BY id", tableName));
   }
+
+  @Test
+  public void testMigrateEmptyPartitionedTable() throws Exception {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet 
PARTITIONED BY (id) LOCATION '%s'",
+        tableName, location);
+    Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, 
tableName);
+    Assert.assertEquals(0L, result);
+  }
+
+  @Test
+  public void testMigrateEmptyTable() throws Exception {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet 
LOCATION '%s'",
+        tableName, location);
+    Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, 
tableName);
+    Assert.assertEquals(0L, result);
+  }
 }
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index 4bfff5b2c4..89c69c8cf4 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -440,12 +440,12 @@ public class SparkTableUtil {
       } else {
         List<SparkPartition> sourceTablePartitions =
             getPartitions(spark, sourceTableIdent, partitionFilter);
-        Preconditions.checkArgument(
-            !sourceTablePartitions.isEmpty(),
-            "Cannot find any partitions in table %s",
-            sourceTableIdent);
-        importSparkPartitions(
-            spark, sourceTablePartitions, targetTable, spec, stagingDir, 
checkDuplicateFiles);
+        if (sourceTablePartitions.isEmpty()) {
+          targetTable.newAppend().commit();
+        } else {
+          importSparkPartitions(
+              spark, sourceTablePartitions, targetTable, spec, stagingDir, 
checkDuplicateFiles);
+        }
       }
     } catch (AnalysisException e) {
       throw SparkExceptionUtil.toUncheckedException(
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
index 49fee09408..b8b5df0990 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java
@@ -201,4 +201,26 @@ public class TestMigrateTableProcedure extends 
SparkExtensionsTestBase {
         ImmutableList.of(row(1L, "2023/05/30", 
java.sql.Date.valueOf("2023-05-30"))),
         sql("SELECT * FROM %s ORDER BY id", tableName));
   }
+
+  @Test
+  public void testMigrateEmptyPartitionedTable() throws Exception {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet 
PARTITIONED BY (id) LOCATION '%s'",
+        tableName, location);
+    Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, 
tableName);
+    Assert.assertEquals(0L, result);
+  }
+
+  @Test
+  public void testMigrateEmptyTable() throws Exception {
+    Assume.assumeTrue(catalogName.equals("spark_catalog"));
+    String location = temp.newFolder().toString();
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet 
LOCATION '%s'",
+        tableName, location);
+    Object result = scalarSql("CALL %s.system.migrate('%s')", catalogName, 
tableName);
+    Assert.assertEquals(0L, result);
+  }
 }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index 03255e6a7d..37f3f0c6c4 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -441,12 +441,12 @@ public class SparkTableUtil {
       } else {
         List<SparkPartition> sourceTablePartitions =
             getPartitions(spark, sourceTableIdent, partitionFilter);
-        Preconditions.checkArgument(
-            !sourceTablePartitions.isEmpty(),
-            "Cannot find any partitions in table %s",
-            sourceTableIdent);
-        importSparkPartitions(
-            spark, sourceTablePartitions, targetTable, spec, stagingDir, 
checkDuplicateFiles);
+        if (sourceTablePartitions.isEmpty()) {
+          targetTable.newAppend().commit();
+        } else {
+          importSparkPartitions(
+              spark, sourceTablePartitions, targetTable, spec, stagingDir, 
checkDuplicateFiles);
+        }
       }
     } catch (AnalysisException e) {
       throw SparkExceptionUtil.toUncheckedException(

Reply via email to