[
https://issues.apache.org/jira/browse/HUDI-2357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404961#comment-17404961
]
ASF GitHub Bot commented on HUDI-2357:
--------------------------------------
pengzhiwei2018 commented on a change in pull request #3534:
URL: https://github.com/apache/hudi/pull/3534#discussion_r696311825
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -193,15 +193,16 @@ object InsertIntoHoodieTableCommand extends Logging {
s"[${insertPartitions.keys.mkString(" " )}]" +
s" not equal to the defined partition in
table[${table.partitionColumnNames.mkString(",")}]")
}
- val parameters = withSparkConf(sparkSession, table.storage.properties)()
++ extraOptions
+ val options = table.storage.properties ++ extraOptions
Review comment:
Yes, we have the UT for merge with a CTAS table. But this bug happen
only for hive meta client because we have missed some hive table properties
before this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> MERGE INTO doesn't work for tables created using CTAS
> -----------------------------------------------------
>
> Key: HUDI-2357
> URL: https://issues.apache.org/jira/browse/HUDI-2357
> Project: Apache Hudi
> Issue Type: Sub-task
> Components: Spark Integration
> Reporter: Vinoth Govindarajan
> Assignee: pengzhiwei
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.10.0
>
>
> MERGE INTO command doesn't select the correct primary key for tables created
> using CTAS, whereas it works for tables created using CREATE TABLE command.
> I guess we are hitting this issue because the key generator class is set to
> SqlKeyGenerator for tables created using CTAS:
> working use-case:
> {code:java}
> create table h5 (id bigint, name string, ts bigint) using hudi
> options (type = "cow" , primaryKey="id" , preCombineField="ts" );
> merge into h5 as t0
> using (
> select 5 as s_id, 'vinoth' as s_name, current_timestamp() as s_ts
> ) t1
> on t1.s_id = t0.id
> when matched then update set *
> when not matched then insert *;
> {code}
> hoodie.properties for working use-case:
> {code:java}
> ➜ analytics.db git:(apache_hudi_support) cat h5/.hoodie/hoodie.properties
> #Properties saved on Wed Aug 25 04:10:33 UTC 2021
> #Wed Aug 25 04:10:33 UTC 2021
> hoodie.table.name=h5
> hoodie.table.recordkey.fields=id
> hoodie.table.type=COPY_ON_WRITE
> hoodie.table.precombine.field=ts
> hoodie.table.partition.fields=
> hoodie.archivelog.folder=archived
> hoodie.table.create.schema={"type"\:"record","name"\:"topLevelRecord","fields"\:[{"name"\:"_hoodie_commit_time","type"\:["string","null"]},{"name"\:"_hoodie_commit_seqno","type"\:["string","null"]},{"name"\:"_hoodie_record_key","type"\:["string","null"]},{"name"\:"_hoodie_partition_path","type"\:["string","null"]},{"name"\:"_hoodie_file_name","type"\:["string","null"]},{"name"\:"id","type"\:["long","null"]},{"name"\:"name","type"\:["string","null"]},{"name"\:"ts","type"\:["long","null"]}]}
> hoodie.timeline.layout.version=1
> hoodie.table.version=1{code}
>
> Whereas this doesn't work:
> {code:java}
> create table h4 using hudi options (type = "cow" , primaryKey="id" ,
> preCombineField="ts" ) as select 5 as id, cast(rand() as string) as name,
> current_timestamp();
> merge into h3 as t0u sing (select '5' as s_id, 'vinoth' as s_name,
> current_timestamp() as s_ts) t1 on t1.s_id = t0.id when matched then update
> set * when not matched then insert *;
> ========ERROR LOG====================
> 544702 [main] ERROR org.apache.spark.sql.hive.thriftserver.SparkSQLDriver -
> Failed in [merge into analytics.h3 as t0using ( select '5' as s_id,
> 'vinoth' as s_name, current_timestamp() as s_ts) t1on t1.s_id = t0.idwhen
> matched then update set *when not matched then insert
> *]java.lang.IllegalArgumentException: Merge Key[id] is not Equal to the
> defined primary key[] in table h3 at
> org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.buildMergeIntoConfig(MergeIntoHoodieTableCommand.scala:425)
> at
> org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:147)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
> at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at
> org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at
> org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at
> org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at
> org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at
> org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at
> org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) at
> org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650) at
> org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
> at
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377)
> at
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496)
> at
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490)
> at scala.collection.Iterator.foreach(Iterator.scala:941) at
> scala.collection.Iterator.foreach$(Iterator.scala:941) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490)
> at
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282)
> at
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source) at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at
> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at
> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)java.lang.IllegalArgumentException:
> Merge Key[id] is not Equal to the defined primary key[] in table h3 at
> org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.buildMergeIntoConfig(MergeIntoHoodieTableCommand.scala:425)
> at
> org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:147)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
> at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at
> org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at
> org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at
> org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at
> org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at
> org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at
> org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) at
> org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650) at
> org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
> at
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377)
> at
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496)
> at
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490)
> at scala.collection.Iterator.foreach(Iterator.scala:941) at
> scala.collection.Iterator.foreach$(Iterator.scala:941) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490)
> at
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282)
> at
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source) at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at
> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at
> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala){code}
> hoodie.properties for not working use-case (CTAS table):
> {code:java}
> ➜ analytics.db git:(apache_hudi_support) cat h4/.hoodie/hoodie.properties
> #Properties saved on Wed Aug 25 04:09:37 UTC 2021
> #Wed Aug 25 04:09:37 UTC 2021
> hoodie.table.name=h4
> hoodie.table.recordkey.fields=id
> hoodie.table.type=COPY_ON_WRITE
> hoodie.table.precombine.field=ts
> hoodie.table.partition.fields=
> hoodie.archivelog.folder=archived
> hoodie.populate.meta.fields=true
> hoodie.table.keygenerator.class=org.apache.spark.sql.hudi.command.SqlKeyGenerator
> hoodie.table.base.file.format=PARQUET
> hoodie.timeline.layout.version=1
> hoodie.table.version=1{code}
> This is a blocker for the dbt integration
> https://issues.apache.org/jira/browse/HUDI-2319
> Please try to fix it as part of the 0.9.0 release so that the dbt integration
> can be unblocked.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)