This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d6419b5e68b [HUDI-7057] Support CopyToTableProcedure with patitial 
column copy (#10025)
d6419b5e68b is described below

commit d6419b5e68bbdadece255576d5dad4ad7ab5f0b2
Author: xuzifu666 <[email protected]>
AuthorDate: Thu Nov 9 18:01:03 2023 +0800

    [HUDI-7057] Support CopyToTableProcedure with patitial column copy (#10025)
---
 .../command/procedures/CopyToTableProcedure.scala  | 14 +++++--
 .../hudi/procedure/TestCopyToTableProcedure.scala  | 46 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 3 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTableProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTableProcedure.scala
index ac23595048f..e661f0a35ae 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTableProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTableProcedure.scala
@@ -34,7 +34,8 @@ class CopyToTableProcedure extends BaseProcedure with 
ProcedureBuilder with Logg
     ProcedureParameter.optional(3, "begin_instance_time", 
DataTypes.StringType, ""),
     ProcedureParameter.optional(4, "end_instance_time", DataTypes.StringType, 
""),
     ProcedureParameter.optional(5, "as_of_instant", DataTypes.StringType, ""),
-    ProcedureParameter.optional(6, "save_mode", DataTypes.StringType, 
"overwrite")
+    ProcedureParameter.optional(6, "save_mode", DataTypes.StringType, 
"overwrite"),
+    ProcedureParameter.optional(7, "columns", DataTypes.StringType, "")
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -55,6 +56,7 @@ class CopyToTableProcedure extends BaseProcedure with 
ProcedureBuilder with Logg
     val endInstance = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[String]
     val asOfInstant = getArgValueOrDefault(args, 
PARAMETERS(5)).get.asInstanceOf[String]
     val saveModeStr = getArgValueOrDefault(args, 
PARAMETERS(6)).get.asInstanceOf[String]
+    val columns = getArgValueOrDefault(args, 
PARAMETERS(7)).get.asInstanceOf[String]
 
     assert(saveModeStr.nonEmpty, "save_mode(append,overwrite) can not be 
null.")
 
@@ -102,12 +104,18 @@ class CopyToTableProcedure extends BaseProcedure with 
ProcedureBuilder with Logg
         .mode(saveMode.toString)
         .saveAsTable(newTableName)
     } else {
-      sourceDataFrame.write
+      var selectColumns = Seq[String]()
+      if (!columns.eq("")) {
+        selectColumns = columns.split(",").toStream
+      } else {
+        selectColumns = 
sparkSession.read.table(tableName.get.asInstanceOf[String]).schema.fields.toStream.map(_.name)
+      }
+      sourceDataFrame.selectExpr(selectColumns: _*)
+        .write
         .mode(saveMode.toString)
         .saveAsTable(newTableName)
     }
 
-
     Seq(Row(0))
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
index 6866b62f37c..16029680239 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
@@ -59,6 +59,52 @@ class TestCopyToTableProcedure extends 
HoodieSparkProcedureTestBase {
     }
   }
 
+  test("Test Call copy_to_table Procedure with columns") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+      spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
+      spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500")
+
+      val copyTableName = generateTableName
+      // Check required fields
+      checkExceptionContain(s"call 
copy_to_table(table=>'$tableName')")(s"Argument: new_table is required")
+
+      val row = spark.sql(s"""call 
copy_to_table(table=>'$tableName',new_table=>'$copyTableName')""").collectAsList()
+      assert(row.size() == 1 && row.get(0).get(0) == 0)
+      val copyTableCount = spark.sql(s"""select count(1) from 
$copyTableName""").collectAsList()
+      assert(copyTableCount.size() == 1 && copyTableCount.get(0).get(0) == 4)
+
+      val patitialTable = generateTableName
+      spark.sql(s"""call 
copy_to_table(table=>'$tableName',new_table=>'$patitialTable',columns=>'id,name')""").collectAsList()
+      checkAnswer(s"select * from $patitialTable")(
+        Seq(1, "a1"),
+        Seq(2, "a2"),
+        Seq(3, "a3"),
+        Seq(4, "a4")
+      )
+
+    }
+  }
+
   test("Test Call copy_to_table Procedure with snapshot") {
     withTempDir { tmp =>
       val tableName = generateTableName

Reply via email to