This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d6419b5e68b [HUDI-7057] Support CopyToTableProcedure with patitial
column copy (#10025)
d6419b5e68b is described below
commit d6419b5e68bbdadece255576d5dad4ad7ab5f0b2
Author: xuzifu666 <[email protected]>
AuthorDate: Thu Nov 9 18:01:03 2023 +0800
[HUDI-7057] Support CopyToTableProcedure with patitial column copy (#10025)
---
.../command/procedures/CopyToTableProcedure.scala | 14 +++++--
.../hudi/procedure/TestCopyToTableProcedure.scala | 46 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 3 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTableProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTableProcedure.scala
index ac23595048f..e661f0a35ae 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTableProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTableProcedure.scala
@@ -34,7 +34,8 @@ class CopyToTableProcedure extends BaseProcedure with
ProcedureBuilder with Logg
ProcedureParameter.optional(3, "begin_instance_time",
DataTypes.StringType, ""),
ProcedureParameter.optional(4, "end_instance_time", DataTypes.StringType,
""),
ProcedureParameter.optional(5, "as_of_instant", DataTypes.StringType, ""),
- ProcedureParameter.optional(6, "save_mode", DataTypes.StringType,
"overwrite")
+ ProcedureParameter.optional(6, "save_mode", DataTypes.StringType,
"overwrite"),
+ ProcedureParameter.optional(7, "columns", DataTypes.StringType, "")
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -55,6 +56,7 @@ class CopyToTableProcedure extends BaseProcedure with
ProcedureBuilder with Logg
val endInstance = getArgValueOrDefault(args,
PARAMETERS(4)).get.asInstanceOf[String]
val asOfInstant = getArgValueOrDefault(args,
PARAMETERS(5)).get.asInstanceOf[String]
val saveModeStr = getArgValueOrDefault(args,
PARAMETERS(6)).get.asInstanceOf[String]
+ val columns = getArgValueOrDefault(args,
PARAMETERS(7)).get.asInstanceOf[String]
assert(saveModeStr.nonEmpty, "save_mode(append,overwrite) can not be
null.")
@@ -102,12 +104,18 @@ class CopyToTableProcedure extends BaseProcedure with
ProcedureBuilder with Logg
.mode(saveMode.toString)
.saveAsTable(newTableName)
} else {
- sourceDataFrame.write
+ var selectColumns = Seq[String]()
+ if (!columns.eq("")) {
+ selectColumns = columns.split(",").toStream
+ } else {
+ selectColumns =
sparkSession.read.table(tableName.get.asInstanceOf[String]).schema.fields.toStream.map(_.name)
+ }
+ sourceDataFrame.selectExpr(selectColumns: _*)
+ .write
.mode(saveMode.toString)
.saveAsTable(newTableName)
}
-
Seq(Row(0))
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
index 6866b62f37c..16029680239 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
@@ -59,6 +59,52 @@ class TestCopyToTableProcedure extends
HoodieSparkProcedureTestBase {
}
}
+ test("Test Call copy_to_table Procedure with columns") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+ spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
+ spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500")
+
+ val copyTableName = generateTableName
+ // Check required fields
+ checkExceptionContain(s"call
copy_to_table(table=>'$tableName')")(s"Argument: new_table is required")
+
+ val row = spark.sql(s"""call
copy_to_table(table=>'$tableName',new_table=>'$copyTableName')""").collectAsList()
+ assert(row.size() == 1 && row.get(0).get(0) == 0)
+ val copyTableCount = spark.sql(s"""select count(1) from
$copyTableName""").collectAsList()
+ assert(copyTableCount.size() == 1 && copyTableCount.get(0).get(0) == 4)
+
+ val patitialTable = generateTableName
+ spark.sql(s"""call
copy_to_table(table=>'$tableName',new_table=>'$patitialTable',columns=>'id,name')""").collectAsList()
+ checkAnswer(s"select * from $patitialTable")(
+ Seq(1, "a1"),
+ Seq(2, "a2"),
+ Seq(3, "a3"),
+ Seq(4, "a4")
+ )
+
+ }
+ }
+
test("Test Call copy_to_table Procedure with snapshot") {
withTempDir { tmp =>
val tableName = generateTableName