[
https://issues.apache.org/jira/browse/HUDI-2279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398011#comment-17398011
]
ASF GitHub Bot commented on HUDI-2279:
--------------------------------------
dongkelun commented on a change in pull request #3415:
URL: https://github.com/apache/hudi/pull/3415#discussion_r687624856
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -142,11 +142,25 @@ case class HoodieResolveReferences(sparkSession:
SparkSession) extends Rule[Logi
val resolvedCondition =
condition.map(resolveExpressionFrom(resolvedSource)(_))
val resolvedAssignments = if (isInsertOrUpdateStar(assignments)) {
// assignments is empty means insert * or update set *
- // we fill assign all the source fields to the target fields
- target.output
- .filter(attr => !HoodieSqlUtils.isMetaField(attr.name))
- .zip(resolvedSource.output.filter(attr =>
!HoodieSqlUtils.isMetaField(attr.name)))
- .map { case (targetAttr, sourceAttr) => Assignment(targetAttr,
sourceAttr) }
+ val resolvedSourceOutputWithoutMetaFields =
resolvedSource.output.filter(attr => !HoodieSqlUtils.isMetaField(attr.name))
+ val targetOutputWithoutMetaFields = target.output.filter(attr =>
!HoodieSqlUtils.isMetaField(attr.name))
+ val resolvedSourceColumnNamesWithoutMetaFields =
resolvedSourceOutputWithoutMetaFields.map(_.name)
+ val targetColumnNamesWithoutMetaFields =
targetOutputWithoutMetaFields.map(_.name)
+
+
if(targetColumnNamesWithoutMetaFields.toSet.subsetOf(resolvedSourceColumnNamesWithoutMetaFields.toSet)){
+ //If sourceTable's columns contains all targetTable's columns,
+ //We fill assign all the source fields to the target fields by
column name matching.
+ val sourceColNameAttrMap =
resolvedSourceOutputWithoutMetaFields.map(attr => (attr.name,attr)).toMap
+ targetOutputWithoutMetaFields.map(targetAttr => {
+ val sourceAttr = sourceColNameAttrMap.get(targetAttr.name).get
+ Assignment(targetAttr, sourceAttr)
Review comment:
Okay, thanks for the guidance. It's really better
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> Support column name matching for insert * and update set * in merge into
> when sourceTable's columns contains all targetTable's columns
> ---------------------------------------------------------------------------------------------------------------------------------------
>
> Key: HUDI-2279
> URL: https://issues.apache.org/jira/browse/HUDI-2279
> Project: Apache Hudi
> Issue Type: Improvement
> Components: Spark Integration
> Reporter: 董可伦
> Assignee: 董可伦
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.9.0
>
>
> Example:
> {code:java}
> val tableName = generateTableName
> // Create table
> spark.sql(
> s"""
> |create table $tableName (
> | id int,
> | name string,
> | price double,
> | ts long,
> | dt string
> |) using hudi
> | location '${tmp.getCanonicalPath}/$tableName'
> | options (
> | primaryKey ='id',
> | preCombineField = 'ts'
> | )
> """.stripMargin)
> spark.sql(
> s"""
> |merge into $tableName as t0
> |using (
> | select 1 as id, '2021-05-05' as dt, 1002 as ts, 97 as price, 'a1' as
> name union all
> | select 1 as id, '2021-05-05' as dt, 1003 as ts, 98 as price, 'a2' as
> name union all
> | select 2 as id, '2021-05-05' as dt, 1001 as ts, 99 as price, 'a3' as
> name
> | ) as s0
> |on t0.id = s0.id
> |when matched then update set *
> |when not matched then insert *
> |""".stripMargin)
> spark.sql(s"select id, name, price, ts, dt from $tableName").show(){code}
> Fow now,the result is:
> +---+----------+-----+---+---+
> | id| name|price| ts| dt|
> +---+----------+-----+---+---+
> | 2|2021-05-05| 99.0| 99| a3|
> | 1|2021-05-05| 98.0| 98| a2|
> +---+----------+-----+---+---+
> When the order of the column types of souceTable is different from that of
> the column types of targetTable
>
> {code:java}
> spark.sql(
> s"""
> |merge into ${tableName} as t0
> |using (
> | select 1 as id, 'a1' as name, 1002 as ts, '2021-05-05' as dt, 97 as
> price union all
> | select 1 as id, 'a2' as name, 1003 as ts, '2021-05-05' as dt, 98 as
> price union all
> | select 2 as id, 'a3' as name, 1001 as ts, '2021-05-05' as dt, 99 as
> price
> | ) as s0
> |on t0.id = s0.id
> |when matched then update set *
> |when not matched then insert *
> |""".stripMargin){code}
>
> It will throw an exception:
> {code:java}
> [ERROR] 2021-08-05 21:48:53,941 org.apache.hudi.io.HoodieWriteHandle - Error
> writing record HoodieRecord{key=HoodieKey { recordKey=id:2 partitionPath=},
> currentLocation='null', newLocation='null'}
> java.lang.RuntimeException: Error in execute expression:
> org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer.
> Expressions is: [boundreference() AS `id` boundreference() AS `name`
> CAST(boundreference() AS `price` AS DOUBLE) CAST(boundreference() AS `ts` AS
> BIGINT) CAST(boundreference() AS `dt` AS STRING)]
> CodeBody is: {
> ......
> Caused by: java.lang.ClassCastException:
> org.apache.spark.unsafe.types.UTF8String cannot be cast to
> java.lang.IntegerCaused by: java.lang.ClassCastException:
> org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer
> at
> org.apache.hudi.sql.payload.ExpressionPayloadEvaluator_366797ae_4c30_4862_8222_7be486ede4f8.eval(Unknown
> Source) at
> org.apache.spark.sql.hudi.command.payload.ExpressionPayload.org$apache$spark$sql$hudi$command$payload$ExpressionPayload$$evaluate(ExpressionPayload.scala:258)
> ... 18 more{code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)