This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 01adbe72f [spark] Support migrate_table with options_map  (#4015)
01adbe72f is described below

commit 01adbe72f2940bde3aa584d3fa4422364a8d7c71
Author: xuzifu666 <[email protected]>
AuthorDate: Wed Aug 21 09:42:50 2024 +0800

    [spark] Support migrate_table with options_map  (#4015)
---
 docs/content/spark/procedures.md                   |  3 ++-
 .../spark/procedure/MigrateTableProcedure.java     | 27 +++++++++++++++++--
 .../procedure/MigrateTableProcedureTest.scala      | 30 ++++++++++++++++++++++
 3 files changed, 57 insertions(+), 3 deletions(-)

diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 7bfa6b24f..cba00296e 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -126,8 +126,9 @@ This section introduce all available spark procedures about 
paimon.
             <li>options: the table options of the paimon table to migrate.</li>
             <li>target_table: name of the target paimon table to migrate. If 
not set would keep the same name with origin table</li>
             <li>delete_origin: If had set target_table, can set delete_origin 
to decide whether delete the origin table metadata from hms after migrate. 
Default is true</li>
+            <li>options_map: Options map for adding key-value options which is 
a map.</li>      
       </td>
-      <td>CALL sys.migrate_table(source_type => 'hive', table => 'default.T', 
options => 'file.format=parquet')</td>
+      <td>CALL sys.migrate_table(source_type => 'hive', table => 'default.T', 
options => 'file.format=parquet', options_map => map('k1','v1'))</td>
     </tr>
     <tr>
       <td>migrate_file</td>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
index 30b97b33f..a34bc9dac 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
@@ -27,12 +27,16 @@ import org.apache.paimon.utils.ParameterUtils;
 import org.apache.paimon.utils.StringUtils;
 
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.MapData;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import static org.apache.spark.sql.types.DataTypes.BooleanType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
@@ -53,7 +57,9 @@ public class MigrateTableProcedure extends BaseProcedure {
                 ProcedureParameter.required("table", StringType),
                 ProcedureParameter.optional("options", StringType),
                 ProcedureParameter.optional("delete_origin", BooleanType),
-                ProcedureParameter.optional("target_table", StringType)
+                ProcedureParameter.optional("target_table", StringType),
+                ProcedureParameter.optional(
+                        "options_map", DataTypes.createMapType(StringType, 
StringType))
             };
 
     private static final StructType OUTPUT_TYPE =
@@ -83,6 +89,8 @@ public class MigrateTableProcedure extends BaseProcedure {
         String properties = args.isNullAt(2) ? null : args.getString(2);
         boolean deleteNeed = args.isNullAt(3) || args.getBoolean(3);
         String targetTable = args.isNullAt(4) ? null : args.getString(4);
+        MapData mapData = args.isNullAt(5) ? null : args.getMap(5);
+        Map<String, String> optionMap = mapDataToHashMap(mapData);
 
         Identifier sourceTableId = Identifier.fromString(sourceTable);
         Identifier tmpTableId =
@@ -92,6 +100,9 @@ public class MigrateTableProcedure extends BaseProcedure {
 
         Catalog paimonCatalog = ((WithPaimonCatalog) 
tableCatalog()).paimonCatalog();
 
+        Map<String, String> options = 
ParameterUtils.parseCommaSeparatedKeyValues(properties);
+        options.putAll(optionMap);
+
         try {
             Migrator migrator =
                     TableMigrationUtils.getImporter(
@@ -101,7 +112,7 @@ public class MigrateTableProcedure extends BaseProcedure {
                             sourceTableId.getObjectName(),
                             tmpTableId.getDatabaseName(),
                             tmpTableId.getObjectName(),
-                            
ParameterUtils.parseCommaSeparatedKeyValues(properties));
+                            options);
 
             migrator.deleteOriginTable(deleteNeed);
             migrator.executeMigrate();
@@ -115,6 +126,18 @@ public class MigrateTableProcedure extends BaseProcedure {
         return new InternalRow[] {newInternalRow(true)};
     }
 
+    public static Map<String, String> mapDataToHashMap(MapData mapData) {
+        HashMap<String, String> map = new HashMap<>();
+        if (mapData != null) {
+            for (int index = 0; index < mapData.numElements(); index++) {
+                map.put(
+                        mapData.keyArray().getUTF8String(index).toString(),
+                        mapData.valueArray().getUTF8String(index).toString());
+            }
+        }
+        return map;
+    }
+
     public static ProcedureBuilder builder() {
         return new BaseProcedure.Builder<MigrateTableProcedure>() {
             @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
index 538de53dd..7a97c334f 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
@@ -46,6 +46,36 @@ class MigrateTableProcedureTest extends PaimonHiveTestBase {
       }
     })
 
+  Seq("parquet", "orc", "avro").foreach(
+    format => {
+      test(s"Paimon migrate table procedure: migrate $format table with 
options_map") {
+        withTable("hive_tbl") {
+          // create hive table
+          spark.sql(s"""
+                       |CREATE TABLE hive_tbl (id STRING, name STRING, pt 
STRING)
+                       |USING $format
+                       |""".stripMargin)
+
+          spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 'p1'), ('2', 'b', 
'p2')")
+
+          spark.sql(
+            s"CALL sys.migrate_table(source_type => 'hive', table => 
'$hiveDbName.hive_tbl', options => 'file.format=$format', options_map => 
map('orc.encrypt', 'pii:id,name'))")
+
+          checkAnswer(
+            spark.sql(s"SELECT * FROM hive_tbl ORDER BY id"),
+            Row("1", "a", "p1") :: Row("2", "b", "p2") :: Nil)
+
+          assert(
+            spark
+              .sql("SHOW CREATE TABLE hive_tbl")
+              .collect()
+              .apply(0)
+              .toString()
+              .contains("'orc.encrypt' = 'pii:id,name',"))
+        }
+      }
+    })
+
   Seq("parquet", "orc", "avro").foreach(
     format => {
       test(

Reply via email to