[ 
https://issues.apache.org/jira/browse/HUDI-2357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Govindarajan updated HUDI-2357:
--------------------------------------
    Description: 
MERGE INTO command doesn't select the correct primary key for tables created 
using CTAS, whereas it works for tables created using CREATE TABLE command.

I guess we are hitting this issue because the key generator class is set to 
SqlKeyGenerator for tables created using CTAS:

working use-case:
{code:java}
create table h5 (id bigint, name string, ts bigint) using hudi
options (type = "cow" , primaryKey="id" , preCombineField="ts" );

merge into h5 as t0
using (
    select 5 as s_id, 'vinoth' as s_name, current_timestamp() as s_ts
) t1
on t1.s_id = t0.id
when matched then update set * 
when not matched then insert *;
{code}
hoodie.properties for working use-case:
{code:java}
➜  analytics.db git:(apache_hudi_support) cat h5/.hoodie/hoodie.properties
#Properties saved on Wed Aug 25 04:10:33 UTC 2021
#Wed Aug 25 04:10:33 UTC 2021
hoodie.table.name=h5
hoodie.table.recordkey.fields=id
hoodie.table.type=COPY_ON_WRITE
hoodie.table.precombine.field=ts
hoodie.table.partition.fields=
hoodie.archivelog.folder=archived
hoodie.table.create.schema={"type"\:"record","name"\:"topLevelRecord","fields"\:[{"name"\:"_hoodie_commit_time","type"\:["string","null"]},{"name"\:"_hoodie_commit_seqno","type"\:["string","null"]},{"name"\:"_hoodie_record_key","type"\:["string","null"]},{"name"\:"_hoodie_partition_path","type"\:["string","null"]},{"name"\:"_hoodie_file_name","type"\:["string","null"]},{"name"\:"id","type"\:["long","null"]},{"name"\:"name","type"\:["string","null"]},{"name"\:"ts","type"\:["long","null"]}]}
hoodie.timeline.layout.version=1
hoodie.table.version=1{code}
 

Whereas this doesn't work:
{code:java}
create table h4 using hudi options (type = "cow" , primaryKey="id" , 
preCombineField="ts" ) as select 5 as id, cast(rand() as string) as name, 
current_timestamp();

merge into h3 as t0u sing (select '5' as s_id, 'vinoth' as s_name, 
current_timestamp() as s_ts) t1 on t1.s_id = t0.id when matched then update set 
* when not matched then insert *;

========ERROR LOG====================
544702 [main] ERROR org.apache.spark.sql.hive.thriftserver.SparkSQLDriver  - 
Failed in [merge into analytics.h3 as t0using (    select '5' as s_id, 'vinoth' 
as s_name, current_timestamp() as s_ts) t1on t1.s_id = t0.idwhen matched then 
update set *when not matched then insert *]java.lang.IllegalArgumentException: 
Merge Key[id] is not Equal to the defined primary key[] in table h3 at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.buildMergeIntoConfig(MergeIntoHoodieTableCommand.scala:425)
 at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:147)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
 at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
 at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
 at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
 at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at 
org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at 
org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) at 
org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650) at 
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490)
 at scala.collection.Iterator.foreach(Iterator.scala:941) at 
scala.collection.Iterator.foreach$(Iterator.scala:941) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at 
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at 
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)java.lang.IllegalArgumentException:
 Merge Key[id] is not Equal to the defined primary key[] in table h3 at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.buildMergeIntoConfig(MergeIntoHoodieTableCommand.scala:425)
 at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:147)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
 at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
 at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
 at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
 at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at 
org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at 
org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) at 
org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650) at 
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490)
 at scala.collection.Iterator.foreach(Iterator.scala:941) at 
scala.collection.Iterator.foreach$(Iterator.scala:941) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at 
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at 
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala){code}
hoodie.properties for not working use-case (CTAS table):
{code:java}
➜  analytics.db git:(apache_hudi_support) cat h4/.hoodie/hoodie.properties
#Properties saved on Wed Aug 25 04:09:37 UTC 2021
#Wed Aug 25 04:09:37 UTC 2021
hoodie.table.name=h4
hoodie.table.recordkey.fields=id
hoodie.table.type=COPY_ON_WRITE
hoodie.table.precombine.field=ts
hoodie.table.partition.fields=
hoodie.archivelog.folder=archived
hoodie.populate.meta.fields=true
hoodie.table.keygenerator.class=org.apache.spark.sql.hudi.command.SqlKeyGenerator
hoodie.table.base.file.format=PARQUET
hoodie.timeline.layout.version=1
hoodie.table.version=1{code}
This is a blocker for the dbt integration 
https://issues.apache.org/jira/browse/HUDI-2319

Please try to fix it as part of the 0.9.0 release so that the dbt integration 
can be unblocked.

 

  was:
MERGE INTO command doesn't select the correct primary key for tables created 
using CTAS, whereas it works for tables created using CREATE TABLE command.

I guess we are hitting this issue because the key generator class is set to 
SqlKeyGenerator for tables created using CTAS:

working use-case:
{code:java}
create table h5 (id bigint, name string, ts bigint) using hudi
options (type = "cow" , primaryKey="id" , preCombineField="ts" );

merge into h5 as t0
using (
    select 5 as s_id, 'vinoth' as s_name, current_timestamp() as s_ts
) t1
on t1.s_id = t0.id
when matched then update set * 
when not matched then insert *;
{code}
hoodie.properties for working use-case:
{code:java}
➜  analytics.db git:(apache_hudi_support) cat h5/.hoodie/hoodie.properties
#Properties saved on Wed Aug 25 04:10:33 UTC 2021
#Wed Aug 25 04:10:33 UTC 2021
hoodie.table.name=h5
hoodie.table.recordkey.fields=id
hoodie.table.type=COPY_ON_WRITE
hoodie.table.precombine.field=ts
hoodie.table.partition.fields=
hoodie.archivelog.folder=archived
hoodie.table.create.schema={"type"\:"record","name"\:"topLevelRecord","fields"\:[{"name"\:"_hoodie_commit_time","type"\:["string","null"]},{"name"\:"_hoodie_commit_seqno","type"\:["string","null"]},{"name"\:"_hoodie_record_key","type"\:["string","null"]},{"name"\:"_hoodie_partition_path","type"\:["string","null"]},{"name"\:"_hoodie_file_name","type"\:["string","null"]},{"name"\:"id","type"\:["long","null"]},{"name"\:"name","type"\:["string","null"]},{"name"\:"ts","type"\:["long","null"]}]}
hoodie.timeline.layout.version=1
hoodie.table.version=1{code}
 

Whereas this doesn't work:
{code:java}
create table h4create table h4using hudioptions (type = "cow" , primaryKey="id" 
, preCombineField="ts" ) as select 5 as id, cast(rand() as string) as name, 
current_timestamp();

merge into h3 as t0using (    select '5' as s_id, 'vinoth' as s_name, 
current_timestamp() as s_ts) t1on t1.s_id = t0.idwhen matched then update set * 
when not matched then insert *;

========ERROR LOG====================
544702 [main] ERROR org.apache.spark.sql.hive.thriftserver.SparkSQLDriver  - 
Failed in [merge into analytics.h3 as t0using (    select '5' as s_id, 'vinoth' 
as s_name, current_timestamp() as s_ts) t1on t1.s_id = t0.idwhen matched then 
update set *when not matched then insert *]java.lang.IllegalArgumentException: 
Merge Key[id] is not Equal to the defined primary key[] in table h3 at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.buildMergeIntoConfig(MergeIntoHoodieTableCommand.scala:425)
 at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:147)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
 at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
 at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
 at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
 at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at 
org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at 
org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) at 
org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650) at 
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490)
 at scala.collection.Iterator.foreach(Iterator.scala:941) at 
scala.collection.Iterator.foreach$(Iterator.scala:941) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at 
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at 
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)java.lang.IllegalArgumentException:
 Merge Key[id] is not Equal to the defined primary key[] in table h3 at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.buildMergeIntoConfig(MergeIntoHoodieTableCommand.scala:425)
 at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:147)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
 at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
 at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
 at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
 at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at 
org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at 
org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) at 
org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650) at 
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490)
 at scala.collection.Iterator.foreach(Iterator.scala:941) at 
scala.collection.Iterator.foreach$(Iterator.scala:941) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at 
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at 
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala){code}
hoodie.properties for not working use-case (CTAS table):
{code:java}
➜  analytics.db git:(apache_hudi_support) cat h4/.hoodie/hoodie.properties
#Properties saved on Wed Aug 25 04:09:37 UTC 2021
#Wed Aug 25 04:09:37 UTC 2021
hoodie.table.name=h4
hoodie.table.recordkey.fields=id
hoodie.table.type=COPY_ON_WRITE
hoodie.table.precombine.field=ts
hoodie.table.partition.fields=
hoodie.archivelog.folder=archived
hoodie.populate.meta.fields=true
hoodie.table.keygenerator.class=org.apache.spark.sql.hudi.command.SqlKeyGenerator
hoodie.table.base.file.format=PARQUET
hoodie.timeline.layout.version=1
hoodie.table.version=1{code}
This is a blocker for the dbt integration 
https://issues.apache.org/jira/browse/HUDI-2319

Please try to fix it as part of the 0.9.0 release so that the dbt integration 
can be unblocked.

 


> MERGE INTO doesn't work for tables created using CTAS
> -----------------------------------------------------
>
>                 Key: HUDI-2357
>                 URL: https://issues.apache.org/jira/browse/HUDI-2357
>             Project: Apache Hudi
>          Issue Type: Sub-task
>          Components: Spark Integration
>            Reporter: Vinoth Govindarajan
>            Assignee: pengzhiwei
>            Priority: Major
>             Fix For: 0.9.0
>
>
> MERGE INTO command doesn't select the correct primary key for tables created 
> using CTAS, whereas it works for tables created using CREATE TABLE command.
> I guess we are hitting this issue because the key generator class is set to 
> SqlKeyGenerator for tables created using CTAS:
> working use-case:
> {code:java}
> create table h5 (id bigint, name string, ts bigint) using hudi
> options (type = "cow" , primaryKey="id" , preCombineField="ts" );
> merge into h5 as t0
> using (
>     select 5 as s_id, 'vinoth' as s_name, current_timestamp() as s_ts
> ) t1
> on t1.s_id = t0.id
> when matched then update set * 
> when not matched then insert *;
> {code}
> hoodie.properties for working use-case:
> {code:java}
> ➜  analytics.db git:(apache_hudi_support) cat h5/.hoodie/hoodie.properties
> #Properties saved on Wed Aug 25 04:10:33 UTC 2021
> #Wed Aug 25 04:10:33 UTC 2021
> hoodie.table.name=h5
> hoodie.table.recordkey.fields=id
> hoodie.table.type=COPY_ON_WRITE
> hoodie.table.precombine.field=ts
> hoodie.table.partition.fields=
> hoodie.archivelog.folder=archived
> hoodie.table.create.schema={"type"\:"record","name"\:"topLevelRecord","fields"\:[{"name"\:"_hoodie_commit_time","type"\:["string","null"]},{"name"\:"_hoodie_commit_seqno","type"\:["string","null"]},{"name"\:"_hoodie_record_key","type"\:["string","null"]},{"name"\:"_hoodie_partition_path","type"\:["string","null"]},{"name"\:"_hoodie_file_name","type"\:["string","null"]},{"name"\:"id","type"\:["long","null"]},{"name"\:"name","type"\:["string","null"]},{"name"\:"ts","type"\:["long","null"]}]}
> hoodie.timeline.layout.version=1
> hoodie.table.version=1{code}
>  
> Whereas this doesn't work:
> {code:java}
> create table h4 using hudi options (type = "cow" , primaryKey="id" , 
> preCombineField="ts" ) as select 5 as id, cast(rand() as string) as name, 
> current_timestamp();
> merge into h3 as t0u sing (select '5' as s_id, 'vinoth' as s_name, 
> current_timestamp() as s_ts) t1 on t1.s_id = t0.id when matched then update 
> set * when not matched then insert *;
> ========ERROR LOG====================
> 544702 [main] ERROR org.apache.spark.sql.hive.thriftserver.SparkSQLDriver  - 
> Failed in [merge into analytics.h3 as t0using (    select '5' as s_id, 
> 'vinoth' as s_name, current_timestamp() as s_ts) t1on t1.s_id = t0.idwhen 
> matched then update set *when not matched then insert 
> *]java.lang.IllegalArgumentException: Merge Key[id] is not Equal to the 
> defined primary key[] in table h3 at 
> org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.buildMergeIntoConfig(MergeIntoHoodieTableCommand.scala:425)
>  at 
> org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:147)
>  at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>  at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>  at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
>  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at 
> org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
>  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at 
> org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at 
> org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
> org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at 
> org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
> org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) at 
> org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650) at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
>  at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377)
>  at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496)
>  at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490)
>  at scala.collection.Iterator.foreach(Iterator.scala:941) at 
> scala.collection.Iterator.foreach$(Iterator.scala:941) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490)
>  at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282)
>  at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown 
> Source) at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at 
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) 
> at 
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
>  at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at 
> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at 
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at 
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) 
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at 
> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)java.lang.IllegalArgumentException:
>  Merge Key[id] is not Equal to the defined primary key[] in table h3 at 
> org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.buildMergeIntoConfig(MergeIntoHoodieTableCommand.scala:425)
>  at 
> org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:147)
>  at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>  at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>  at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
>  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at 
> org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
>  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at 
> org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at 
> org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
> org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at 
> org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
> org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) at 
> org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650) at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
>  at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377)
>  at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496)
>  at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490)
>  at scala.collection.Iterator.foreach(Iterator.scala:941) at 
> scala.collection.Iterator.foreach$(Iterator.scala:941) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490)
>  at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282)
>  at 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown 
> Source) at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at 
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) 
> at 
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
>  at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at 
> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at 
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at 
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) 
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at 
> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala){code}
> hoodie.properties for not working use-case (CTAS table):
> {code:java}
> ➜  analytics.db git:(apache_hudi_support) cat h4/.hoodie/hoodie.properties
> #Properties saved on Wed Aug 25 04:09:37 UTC 2021
> #Wed Aug 25 04:09:37 UTC 2021
> hoodie.table.name=h4
> hoodie.table.recordkey.fields=id
> hoodie.table.type=COPY_ON_WRITE
> hoodie.table.precombine.field=ts
> hoodie.table.partition.fields=
> hoodie.archivelog.folder=archived
> hoodie.populate.meta.fields=true
> hoodie.table.keygenerator.class=org.apache.spark.sql.hudi.command.SqlKeyGenerator
> hoodie.table.base.file.format=PARQUET
> hoodie.timeline.layout.version=1
> hoodie.table.version=1{code}
> This is a blocker for the dbt integration 
> https://issues.apache.org/jira/browse/HUDI-2319
> Please try to fix it as part of the 0.9.0 release so that the dbt integration 
> can be unblocked.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to