This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3fa7f5797 [core] Fix migrate hive partitioned table with null
partition (#3445)
3fa7f5797 is described below
commit 3fa7f57977ba378365346edfe2833fb2f09720f8
Author: Zouxxyy <[email protected]>
AuthorDate: Fri May 31 12:48:50 2024 +0800
[core] Fix migrate hive partitioned table with null partition (#3445)
---
.../org/apache/paimon/migrate/FileMetaUtils.java | 14 ++++---
.../apache/paimon/hive/migrate/HiveMigrator.java | 17 +++++---
.../spark/procedure/MigrateTableProcedure.java | 2 +-
.../procedure/MigrateTableProcedureTest.scala | 48 ++++++++++++++++++++++
4 files changed, 70 insertions(+), 11 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 7c84c87d7..2e4409eb6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -172,7 +172,8 @@ public class FileMetaUtils {
public static BinaryRow writePartitionValue(
RowType partitionRowType,
Map<String, String> partitionValues,
- List<BinaryWriter.ValueSetter> valueSetters) {
+ List<BinaryWriter.ValueSetter> valueSetters,
+ String partitionDefaultName) {
BinaryRow binaryRow = new BinaryRow(partitionRowType.getFieldCount());
BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
@@ -180,10 +181,13 @@ public class FileMetaUtils {
List<DataField> fields = partitionRowType.getFields();
for (int i = 0; i < fields.size(); i++) {
- Object value =
- TypeUtils.castFromString(
- partitionValues.get(fields.get(i).name()),
fields.get(i).type());
- valueSetters.get(i).setValue(binaryRowWriter, i, value);
+ String partitionName = partitionValues.get(fields.get(i).name());
+ if (partitionName.equals(partitionDefaultName)) {
+ binaryRowWriter.setNullAt(i);
+ } else {
+ Object value = TypeUtils.castFromString(partitionName,
fields.get(i).type());
+ valueSetters.get(i).setValue(binaryRowWriter, i, value);
+ }
}
binaryRowWriter.complete();
return binaryRow;
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
index a8516c3a4..01c2704de 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
@@ -59,6 +59,7 @@ import java.util.stream.Collectors;
import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Migrate hive table to paimon table. */
public class HiveMigrator implements Migrator {
@@ -77,7 +78,7 @@ public class HiveMigrator implements Migrator {
private final String sourceTable;
private final String targetDatabase;
private final String targetTable;
- private final Map<String, String> options;
+ private final CoreOptions coreOptions;
private Boolean delete = true;
public HiveMigrator(
@@ -94,7 +95,7 @@ public class HiveMigrator implements Migrator {
this.sourceTable = sourceTable;
this.targetDatabase = targetDatabase;
this.targetTable = targetTable;
- this.options = options;
+ this.coreOptions = new CoreOptions(options);
}
public static List<Migrator> databaseMigrators(
@@ -250,8 +251,10 @@ public class HiveMigrator implements Migrator {
List<FieldSchema> fields,
List<FieldSchema> partitionFields,
Map<String, String> hiveTableOptions) {
- HashMap<String, String> paimonOptions = new HashMap<>(this.options);
- paimonOptions.put(CoreOptions.BUCKET.key(), "-1");
+ checkArgument(
+ coreOptions.bucket() == -1,
+ "Hive migrator only support unaware-bucket target table,
bucket should be -1");
+ Map<String, String> paimonOptions = coreOptions.toMap();
// for compatible with hive comment system
if (hiveTableOptions.get("comment") != null) {
paimonOptions.put("hive.comment", hiveTableOptions.get("comment"));
@@ -302,7 +305,11 @@ public class HiveMigrator implements Migrator {
String format =
parseFormat(partition.getSd().getSerdeInfo().toString());
String location = partition.getSd().getLocation();
BinaryRow partitionRow =
- FileMetaUtils.writePartitionValue(partitionRowType,
values, valueSetters);
+ FileMetaUtils.writePartitionValue(
+ partitionRowType,
+ values,
+ valueSetters,
+ coreOptions.partitionDefaultName());
Path path =
paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
migrateTasks.add(
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
index 87090dd92..30b97b33f 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
@@ -109,7 +109,7 @@ public class MigrateTableProcedure extends BaseProcedure {
paimonCatalog.renameTable(tmpTableId, sourceTableId, false);
}
} catch (Exception e) {
- throw new RuntimeException("Call migrate_table error", e);
+ throw new RuntimeException("Call migrate_table error: " +
e.getMessage(), e);
}
return new InternalRow[] {newInternalRow(true)};
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
index 457b33e51..710d4dbfa 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
@@ -21,6 +21,7 @@ package org.apache.paimon.spark.procedure
import org.apache.paimon.spark.PaimonHiveTestBase
import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.assertThatThrownBy
class MigrateTableProcedureTest extends PaimonHiveTestBase {
Seq("parquet", "orc", "avro").foreach(
@@ -93,4 +94,51 @@ class MigrateTableProcedureTest extends PaimonHiveTestBase {
}
}
})
+
+ test(s"Paimon migrate table procedure: migrate partitioned table with null
partition") {
+ withTable("hive_tbl") {
+ // create hive table
+ spark.sql(s"""
+ |CREATE TABLE hive_tbl (id STRING, name STRING, pt INT)
+ |USING parquet
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 1), ('2', 'b', null)")
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM hive_tbl ORDER BY id"),
+ Row("1", "a", 1) :: Row("2", "b", null) :: Nil)
+
+ spark.sql(
+ s"""CALL sys.migrate_table(source_type => 'hive', table =>
'$hiveDbName.hive_tbl',
+ |options =>
'file.format=parquet,partition.default-name=__HIVE_DEFAULT_PARTITION__')
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM hive_tbl ORDER BY id"),
+ Row("1", "a", 1) :: Row("2", "b", null) :: Nil)
+ }
+ }
+
+ test(s"Paimon migrate table procedure: migrate table with wrong options") {
+ withTable("hive_tbl") {
+ // create hive table
+ spark.sql(s"""
+ |CREATE TABLE hive_tbl (id STRING, name STRING, pt INT)
+ |USING parquet
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 1)")
+
+ assertThatThrownBy(
+ () =>
+ spark.sql(
+ s"""CALL sys.migrate_table(source_type => 'hive', table =>
'$hiveDbName.hive_tbl',
+ |options => 'file.format=parquet,bucket=1')
+ |""".stripMargin))
+ .hasMessageContaining("Hive migrator only support unaware-bucket
target table")
+ }
+ }
}