This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 01adbe72f [spark] Support migrate_table with options_map (#4015)
01adbe72f is described below
commit 01adbe72f2940bde3aa584d3fa4422364a8d7c71
Author: xuzifu666 <[email protected]>
AuthorDate: Wed Aug 21 09:42:50 2024 +0800
[spark] Support migrate_table with options_map (#4015)
---
docs/content/spark/procedures.md | 3 ++-
.../spark/procedure/MigrateTableProcedure.java | 27 +++++++++++++++++--
.../procedure/MigrateTableProcedureTest.scala | 30 ++++++++++++++++++++++
3 files changed, 57 insertions(+), 3 deletions(-)
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 7bfa6b24f..cba00296e 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -126,8 +126,9 @@ This section introduce all available spark procedures about
paimon.
<li>options: the table options of the paimon table to migrate.</li>
<li>target_table: name of the target paimon table to migrate. If
not set would keep the same name with origin table</li>
<li>delete_origin: If had set target_table, can set delete_origin
to decide whether delete the origin table metadata from hms after migrate.
Default is true</li>
+ <li>options_map: Options map for adding key-value options which is
a map.</li>
</td>
- <td>CALL sys.migrate_table(source_type => 'hive', table => 'default.T',
options => 'file.format=parquet')</td>
+ <td>CALL sys.migrate_table(source_type => 'hive', table => 'default.T',
options => 'file.format=parquet', options_map => map('k1','v1'))</td>
</tr>
<tr>
<td>migrate_file</td>
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
index 30b97b33f..a34bc9dac 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
@@ -27,12 +27,16 @@ import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.StringUtils;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+import java.util.HashMap;
+import java.util.Map;
+
import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -53,7 +57,9 @@ public class MigrateTableProcedure extends BaseProcedure {
ProcedureParameter.required("table", StringType),
ProcedureParameter.optional("options", StringType),
ProcedureParameter.optional("delete_origin", BooleanType),
- ProcedureParameter.optional("target_table", StringType)
+ ProcedureParameter.optional("target_table", StringType),
+ ProcedureParameter.optional(
+ "options_map", DataTypes.createMapType(StringType,
StringType))
};
private static final StructType OUTPUT_TYPE =
@@ -83,6 +89,8 @@ public class MigrateTableProcedure extends BaseProcedure {
String properties = args.isNullAt(2) ? null : args.getString(2);
boolean deleteNeed = args.isNullAt(3) || args.getBoolean(3);
String targetTable = args.isNullAt(4) ? null : args.getString(4);
+ MapData mapData = args.isNullAt(5) ? null : args.getMap(5);
+ Map<String, String> optionMap = mapDataToHashMap(mapData);
Identifier sourceTableId = Identifier.fromString(sourceTable);
Identifier tmpTableId =
@@ -92,6 +100,9 @@ public class MigrateTableProcedure extends BaseProcedure {
Catalog paimonCatalog = ((WithPaimonCatalog)
tableCatalog()).paimonCatalog();
+ Map<String, String> options =
ParameterUtils.parseCommaSeparatedKeyValues(properties);
+ options.putAll(optionMap);
+
try {
Migrator migrator =
TableMigrationUtils.getImporter(
@@ -101,7 +112,7 @@ public class MigrateTableProcedure extends BaseProcedure {
sourceTableId.getObjectName(),
tmpTableId.getDatabaseName(),
tmpTableId.getObjectName(),
-
ParameterUtils.parseCommaSeparatedKeyValues(properties));
+ options);
migrator.deleteOriginTable(deleteNeed);
migrator.executeMigrate();
@@ -115,6 +126,18 @@ public class MigrateTableProcedure extends BaseProcedure {
return new InternalRow[] {newInternalRow(true)};
}
+ public static Map<String, String> mapDataToHashMap(MapData mapData) {
+ HashMap<String, String> map = new HashMap<>();
+ if (mapData != null) {
+ for (int index = 0; index < mapData.numElements(); index++) {
+ map.put(
+ mapData.keyArray().getUTF8String(index).toString(),
+ mapData.valueArray().getUTF8String(index).toString());
+ }
+ }
+ return map;
+ }
+
public static ProcedureBuilder builder() {
return new BaseProcedure.Builder<MigrateTableProcedure>() {
@Override
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
index 538de53dd..7a97c334f 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
@@ -46,6 +46,36 @@ class MigrateTableProcedureTest extends PaimonHiveTestBase {
}
})
+ Seq("parquet", "orc", "avro").foreach(
+ format => {
+ test(s"Paimon migrate table procedure: migrate $format table with
options_map") {
+ withTable("hive_tbl") {
+ // create hive table
+ spark.sql(s"""
+ |CREATE TABLE hive_tbl (id STRING, name STRING, pt
STRING)
+ |USING $format
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 'p1'), ('2', 'b',
'p2')")
+
+ spark.sql(
+ s"CALL sys.migrate_table(source_type => 'hive', table =>
'$hiveDbName.hive_tbl', options => 'file.format=$format', options_map =>
map('orc.encrypt', 'pii:id,name'))")
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM hive_tbl ORDER BY id"),
+ Row("1", "a", "p1") :: Row("2", "b", "p2") :: Nil)
+
+ assert(
+ spark
+ .sql("SHOW CREATE TABLE hive_tbl")
+ .collect()
+ .apply(0)
+ .toString()
+ .contains("'orc.encrypt' = 'pii:id,name',"))
+ }
+ }
+ })
+
Seq("parquet", "orc", "avro").foreach(
format => {
test(