pravin1406 opened a new issue, #11138:
URL: https://github.com/apache/hudi/issues/11138

   HUDI version -> 0.14.1
   Spark version -> 3.2.0
   hadoop version -> 3.1.1
   hive version -> 3.1.1
   
   Hi
   I wanted to use partial data update payload. I have multiple sources, which 
all want to write into same hudi table. Each of these table do have 1 
precombine and record key in common.
   
   With reconcile schema set to true and payload set to partial data payload. 
I'm able to achieve this as reconcile schema takes the effort to condition my 
schema propertly when using datasource .
   
   Bu same is not the case when using merge into with spark-sql. It gives me 
below error.
   
   `2024-05-02 02:38:02,771 ERROR io.HoodieAppendHandle: Error writing record  
HoodieRecord{key=HoodieKey { recordKey=pravin partitionPath=}, 
currentLocation='HoodieRecordLocation {instantTime=20240502023620823, 
fileId=75001b47-689f-41d2-9216-9fbd79502292-0}', 
newLocation='HoodieRecordLocation {instantTime=20240502023755261, 
fileId=75001b47-689f-41d2-9216-9fbd79502292-0}'}
   java.lang.ArrayIndexOutOfBoundsException: 8
        at 
org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.isNullAt(SpecificInternalRow.scala:241)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply_0_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
 Source)
        at 
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3(ExpressionPayload.scala:128)
        at 
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3$adapted(ExpressionPayload.scala:118)
        at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
        at 
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.processMatchedRecord(ExpressionPayload.scala:118)
        at 
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getInsertValue(ExpressionPayload.scala:247)
        at 
org.apache.hudi.common.model.HoodieAvroRecord.shouldIgnore(HoodieAvroRecord.java:173)
        at 
org.apache.hudi.io.HoodieAppendHandle.prepareRecord(HoodieAppendHandle.java:254)
        at 
org.apache.hudi.io.HoodieAppendHandle.writeToBuffer(HoodieAppendHandle.java:592)
        at 
org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:448)
        at 
org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:83)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:338)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:260)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
        at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
        at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   2024-05-02 02:38:03,375 ERROR hudi.HoodieSparkSqlWriterInternal: UPSERT 
failed with errors
   2024-05-02 02:38:03,375 WARN hudi.HoodieSparkSqlWriterInternal: Closing 
write client
   org.apache.hudi.exception.HoodieException: Merge into Hoodie table command 
failed
     at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:441)
     at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:282)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
     at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
     at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
     at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
     at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
     at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
     at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
     at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
     at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
     at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
     at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
     at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
     at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
     at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
     at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
     at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
     at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
     at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
     at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
     at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
     at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
     at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
     at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
     at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
     at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
     ... 47 elided
   ` 
   On looking at the code i found 2 problems, when i created this table with 
CTAS query using all relevant options including table_properties. 
table_properties were not getting set inside the hive table. I had to set 
hoodie.datasource.hive_sync.table_properties again with all properties. But 
then also reconcile schema did not seem to work. 
   
   After debugging and code walk through i found reconcile schema to be among 
overriding properties. I read the comment there, but  did not really understood 
the perspective behind it. This has kind of blocked me from using spark sql to 
partial update my records.
   
   Can you guys explain why this is there? Also how can we make this work.
   
   CTAS Query
   `spark.sql("CREATE TABLE " + tablename +
             " USING org.apache.hudi " +
             " OPTIONS ( " +
             "   primaryKey '" + recordkey + "', " +
             "   path '/tmp/pravin/" + tablename + "', " +
             "   hoodie.table.name '" + tablename + "', " +
             "   hoodie.datasource.write.operation 'upsert', " +
             "   hoodie.datasource.write.precombine.field '" + precombinekey + 
"', " +
             "   hoodie.datasource.write.recordkey.field '" + recordkey + "', " 
+
             "   hoodie.datasource.write.payload.class 
'org.apache.hudi.common.model.PartialUpdateAvroPayload', " +
             "   hoodie.datasource.write.table.type 'MERGE_ON_READ', " +
             "   hoodie.enable.data.skipping 'true', " +
             "   hoodie.datasource.write.reconcile.schema 'true', " +
             "   hoodie.datasource.hive_sync.support_timestamp 'true', " +
             "   hoodie.upsert.shuffle.parallelism '200', " +
             "   hoodie.index.type 'SIMPLE', " +
             "   hoodie.simple.index.update.partition.path 'true', " +
             "   hoodie.datasource.write.hive_style_partitioning 'true', " +
             "   hoodie.datasource.hive_sync.enable 'true', " +
             "   hoodie.datasource.hive_sync.mode 'HMS', " +
             "   hoodie.datasource.hive_sync.sync_comment 'true', " +
             "   hoodie.datasource.hive_sync.database 'default', " +
             "   hoodie.datasource.hive_sync.table_properties '" + 
tableProperties + "', " +    
             "   hoodie.datasource.hive_sync.table '" + tablename + "', " +
             "   hoodie.schema.on.read.enable 'true' " +
             " ) as select * from merge_source "
   );`
   
   input data
   `val df = 
Seq((9,"qwertyuiop","US","1","pravin","abcd","SFO")).toDF("EventTime","transactionId","Country","storeNbr","FullName","CompanyName","City")`
   
   merge command:
   
   `spark.sql("merge into partial_update_4_rt as target using merge_source as 
source on target."+recordkey +" = source."+recordkey+" when matched then update 
set City =  source.City , CompanyName = source.CompanyName, EventTime = 
source.EventTime").show
   `
   
   
   @codope 
   @ad1happy2go 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to