This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fa7f5797 [core] Fix migrate hive partitioned table with null 
partition (#3445)
3fa7f5797 is described below

commit 3fa7f57977ba378365346edfe2833fb2f09720f8
Author: Zouxxyy <[email protected]>
AuthorDate: Fri May 31 12:48:50 2024 +0800

    [core] Fix migrate hive partitioned table with null partition (#3445)
---
 .../org/apache/paimon/migrate/FileMetaUtils.java   | 14 ++++---
 .../apache/paimon/hive/migrate/HiveMigrator.java   | 17 +++++---
 .../spark/procedure/MigrateTableProcedure.java     |  2 +-
 .../procedure/MigrateTableProcedureTest.scala      | 48 ++++++++++++++++++++++
 4 files changed, 70 insertions(+), 11 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 7c84c87d7..2e4409eb6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -172,7 +172,8 @@ public class FileMetaUtils {
     public static BinaryRow writePartitionValue(
             RowType partitionRowType,
             Map<String, String> partitionValues,
-            List<BinaryWriter.ValueSetter> valueSetters) {
+            List<BinaryWriter.ValueSetter> valueSetters,
+            String partitionDefaultName) {
 
         BinaryRow binaryRow = new BinaryRow(partitionRowType.getFieldCount());
         BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
@@ -180,10 +181,13 @@ public class FileMetaUtils {
         List<DataField> fields = partitionRowType.getFields();
 
         for (int i = 0; i < fields.size(); i++) {
-            Object value =
-                    TypeUtils.castFromString(
-                            partitionValues.get(fields.get(i).name()), 
fields.get(i).type());
-            valueSetters.get(i).setValue(binaryRowWriter, i, value);
+            String partitionName = partitionValues.get(fields.get(i).name());
+            if (partitionName.equals(partitionDefaultName)) {
+                binaryRowWriter.setNullAt(i);
+            } else {
+                Object value = TypeUtils.castFromString(partitionName, 
fields.get(i).type());
+                valueSetters.get(i).setValue(binaryRowWriter, i, value);
+            }
         }
         binaryRowWriter.complete();
         return binaryRow;
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
index a8516c3a4..01c2704de 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
@@ -59,6 +59,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
 import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Migrate hive table to paimon table. */
 public class HiveMigrator implements Migrator {
@@ -77,7 +78,7 @@ public class HiveMigrator implements Migrator {
     private final String sourceTable;
     private final String targetDatabase;
     private final String targetTable;
-    private final Map<String, String> options;
+    private final CoreOptions coreOptions;
     private Boolean delete = true;
 
     public HiveMigrator(
@@ -94,7 +95,7 @@ public class HiveMigrator implements Migrator {
         this.sourceTable = sourceTable;
         this.targetDatabase = targetDatabase;
         this.targetTable = targetTable;
-        this.options = options;
+        this.coreOptions = new CoreOptions(options);
     }
 
     public static List<Migrator> databaseMigrators(
@@ -250,8 +251,10 @@ public class HiveMigrator implements Migrator {
             List<FieldSchema> fields,
             List<FieldSchema> partitionFields,
             Map<String, String> hiveTableOptions) {
-        HashMap<String, String> paimonOptions = new HashMap<>(this.options);
-        paimonOptions.put(CoreOptions.BUCKET.key(), "-1");
+        checkArgument(
+                coreOptions.bucket() == -1,
+                "Hive migrator only support unaware-bucket target table, 
bucket should be -1");
+        Map<String, String> paimonOptions = coreOptions.toMap();
         // for compatible with hive comment system
         if (hiveTableOptions.get("comment") != null) {
             paimonOptions.put("hive.comment", hiveTableOptions.get("comment"));
@@ -302,7 +305,11 @@ public class HiveMigrator implements Migrator {
             String format = 
parseFormat(partition.getSd().getSerdeInfo().toString());
             String location = partition.getSd().getLocation();
             BinaryRow partitionRow =
-                    FileMetaUtils.writePartitionValue(partitionRowType, 
values, valueSetters);
+                    FileMetaUtils.writePartitionValue(
+                            partitionRowType,
+                            values,
+                            valueSetters,
+                            coreOptions.partitionDefaultName());
             Path path = 
paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
 
             migrateTasks.add(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
index 87090dd92..30b97b33f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
@@ -109,7 +109,7 @@ public class MigrateTableProcedure extends BaseProcedure {
                 paimonCatalog.renameTable(tmpTableId, sourceTableId, false);
             }
         } catch (Exception e) {
-            throw new RuntimeException("Call migrate_table error", e);
+            throw new RuntimeException("Call migrate_table error: " + 
e.getMessage(), e);
         }
 
         return new InternalRow[] {newInternalRow(true)};
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
index 457b33e51..710d4dbfa 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
@@ -21,6 +21,7 @@ package org.apache.paimon.spark.procedure
 import org.apache.paimon.spark.PaimonHiveTestBase
 
 import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.assertThatThrownBy
 
 class MigrateTableProcedureTest extends PaimonHiveTestBase {
   Seq("parquet", "orc", "avro").foreach(
@@ -93,4 +94,51 @@ class MigrateTableProcedureTest extends PaimonHiveTestBase {
         }
       }
     })
+
+  test(s"Paimon migrate table procedure: migrate partitioned table with null 
partition") {
+    withTable("hive_tbl") {
+      // create hive table
+      spark.sql(s"""
+                   |CREATE TABLE hive_tbl (id STRING, name STRING, pt INT)
+                   |USING parquet
+                   |PARTITIONED BY (pt)
+                   |""".stripMargin)
+
+      spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 1), ('2', 'b', null)")
+
+      checkAnswer(
+        spark.sql(s"SELECT * FROM hive_tbl ORDER BY id"),
+        Row("1", "a", 1) :: Row("2", "b", null) :: Nil)
+
+      spark.sql(
+        s"""CALL sys.migrate_table(source_type => 'hive', table => 
'$hiveDbName.hive_tbl',
+           |options => 
'file.format=parquet,partition.default-name=__HIVE_DEFAULT_PARTITION__')
+           |""".stripMargin)
+
+      checkAnswer(
+        spark.sql(s"SELECT * FROM hive_tbl ORDER BY id"),
+        Row("1", "a", 1) :: Row("2", "b", null) :: Nil)
+    }
+  }
+
+  test(s"Paimon migrate table procedure: migrate table with wrong options") {
+    withTable("hive_tbl") {
+      // create hive table
+      spark.sql(s"""
+                   |CREATE TABLE hive_tbl (id STRING, name STRING, pt INT)
+                   |USING parquet
+                   |PARTITIONED BY (pt)
+                   |""".stripMargin)
+
+      spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 1)")
+
+      assertThatThrownBy(
+        () =>
+          spark.sql(
+            s"""CALL sys.migrate_table(source_type => 'hive', table => 
'$hiveDbName.hive_tbl',
+               |options => 'file.format=parquet,bucket=1')
+               |""".stripMargin))
+        .hasMessageContaining("Hive migrator only support unaware-bucket 
target table")
+    }
+  }
 }

Reply via email to