This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new b9e28e5 [HUDI-2033] ClassCastException Throw When PreCombineField Is String Type (#3099) b9e28e5 is described below commit b9e28e5292d9b2a4b665c26eeba660437a6a0a45 Author: pengzhiwei <pengzhiwei2...@icloud.com> AuthorDate: Thu Jun 17 23:21:20 2021 +0800 [HUDI-2033] ClassCastException Throw When PreCombineField Is String Type (#3099) --- .../hudi/command/payload/ExpressionPayload.scala | 10 ++-- .../apache/spark/sql/hudi/TestMergeIntoTable.scala | 61 ++++++++++++++++++++++ 2 files changed, 66 insertions(+), 5 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala index 94df155..89da81c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala @@ -19,11 +19,11 @@ package org.apache.spark.sql.hudi.command.payload import java.util.{Base64, Properties} import java.util.concurrent.Callable - import scala.collection.JavaConverters._ import com.google.common.cache.CacheBuilder import org.apache.avro.Schema import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord} +import org.apache.avro.util.Utf8 import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro @@ -290,15 +290,15 @@ object ExpressionPayload { /** * As the "baseEvaluator" return "UTF8String" for the string type which cannot be process by - * the Avro, The StringConvertEvaluator will convert the "UTF8String" to "String". + * the Avro, The StringConvertEvaluator will convert the "UTF8String" to "Utf8". */ case class StringConvertEvaluator(baseEvaluator: IExpressionEvaluator) extends IExpressionEvaluator { /** - * Convert the UTF8String to String + * Convert the UTF8String to Utf8 */ override def eval(record: IndexedRecord): Array[AnyRef] = { - baseEvaluator.eval(record).map{ - case s: UTF8String => s.toString + baseEvaluator.eval(record).map { + case s: UTF8String => new Utf8(s.toString) case o => o } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala index 6a2f79d..969d07b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala @@ -532,4 +532,65 @@ class TestMergeIntoTable extends TestHoodieSqlBase { } } } + + test("Test Different Type of PreCombineField") { + withTempDir { tmp => + val typeAndValue = Seq( + ("string", "'1000'"), + ("int", 1000), + ("bigint", 10000), + ("timestamp", "'2021-05-20 00:00:00'"), + ("date", "'2021-05-20'") + ) + typeAndValue.foreach { case (dataType, dataValue) => + val tableName = generateTableName + // Create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | c $dataType + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | options ( + | primaryKey ='id', + | preCombineField = 'c' + | ) + """.stripMargin) + + // First merge with a extra input field 'flag' (insert a new record) + spark.sql( + s""" + | merge into $tableName + | using ( + | select 1 as id, 'a1' as name, 10 as price, $dataValue as c0, '1' as flag + | ) s0 + | on s0.id = $tableName.id + | when matched and flag = '1' then update set + | id = s0.id, name = s0.name, price = s0.price, c = s0.c0 + | when not matched and flag = '1' then insert * + """.stripMargin) + checkAnswer(s"select id, name, price from $tableName")( + Seq(1, "a1", 10.0) + ) + + spark.sql( + s""" + | merge into $tableName + | using ( + | select 1 as id, 'a1' as name, 10 as price, $dataValue as c + | ) s0 + | on s0.id = $tableName.id + | when matched then update set + | id = s0.id, name = s0.name, price = s0.price + $tableName.price, c = s0.c + | when not matched then insert * + """.stripMargin) + checkAnswer(s"select id, name, price from $tableName")( + Seq(1, "a1", 20.0) + ) + } + } + } }