pravin1406 opened a new issue, #11138:
URL: https://github.com/apache/hudi/issues/11138
HUDI version -> 0.14.1
Spark version -> 3.2.0
hadoop version -> 3.1.1
hive version -> 3.1.1
Hi
I wanted to use partial data update payload. I have multiple sources, which
all want to write into same hudi table. Each of these table do have 1
precombine and record key in common.
With reconcile schema set to true and payload set to partial data payload.
I'm able to achieve this as reconcile schema takes the effort to condition my
schema propertly when using datasource .
Bu same is not the case when using merge into with spark-sql. It gives me
below error.
`2024-05-02 02:38:02,771 ERROR io.HoodieAppendHandle: Error writing record
HoodieRecord{key=HoodieKey { recordKey=pravin partitionPath=},
currentLocation='HoodieRecordLocation {instantTime=20240502023620823,
fileId=75001b47-689f-41d2-9216-9fbd79502292-0}',
newLocation='HoodieRecordLocation {instantTime=20240502023755261,
fileId=75001b47-689f-41d2-9216-9fbd79502292-0}'}
java.lang.ArrayIndexOutOfBoundsException: 8
at
org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.isNullAt(SpecificInternalRow.scala:241)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply_0_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
Source)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3(ExpressionPayload.scala:128)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$3$adapted(ExpressionPayload.scala:118)
at
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.processMatchedRecord(ExpressionPayload.scala:118)
at
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getInsertValue(ExpressionPayload.scala:247)
at
org.apache.hudi.common.model.HoodieAvroRecord.shouldIgnore(HoodieAvroRecord.java:173)
at
org.apache.hudi.io.HoodieAppendHandle.prepareRecord(HoodieAppendHandle.java:254)
at
org.apache.hudi.io.HoodieAppendHandle.writeToBuffer(HoodieAppendHandle.java:592)
at
org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:448)
at
org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:83)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:338)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:260)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2024-05-02 02:38:03,375 ERROR hudi.HoodieSparkSqlWriterInternal: UPSERT
failed with errors
2024-05-02 02:38:03,375 WARN hudi.HoodieSparkSqlWriterInternal: Closing
write client
org.apache.hudi.exception.HoodieException: Merge into Hoodie table command
failed
at
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:441)
at
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:282)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
... 47 elided
`
On looking at the code i found 2 problems, when i created this table with
CTAS query using all relevant options including table_properties.
table_properties were not getting set inside the hive table. I had to set
hoodie.datasource.hive_sync.table_properties again with all properties. But
then also reconcile schema did not seem to work.
After debugging and code walk through i found reconcile schema to be among
overriding properties. I read the comment there, but did not really understood
the perspective behind it. This has kind of blocked me from using spark sql to
partial update my records.
Can you guys explain why this is there? Also how can we make this work.
CTAS Query
`spark.sql("CREATE TABLE " + tablename +
" USING org.apache.hudi " +
" OPTIONS ( " +
" primaryKey '" + recordkey + "', " +
" path '/tmp/pravin/" + tablename + "', " +
" hoodie.table.name '" + tablename + "', " +
" hoodie.datasource.write.operation 'upsert', " +
" hoodie.datasource.write.precombine.field '" + precombinekey +
"', " +
" hoodie.datasource.write.recordkey.field '" + recordkey + "', "
+
" hoodie.datasource.write.payload.class
'org.apache.hudi.common.model.PartialUpdateAvroPayload', " +
" hoodie.datasource.write.table.type 'MERGE_ON_READ', " +
" hoodie.enable.data.skipping 'true', " +
" hoodie.datasource.write.reconcile.schema 'true', " +
" hoodie.datasource.hive_sync.support_timestamp 'true', " +
" hoodie.upsert.shuffle.parallelism '200', " +
" hoodie.index.type 'SIMPLE', " +
" hoodie.simple.index.update.partition.path 'true', " +
" hoodie.datasource.write.hive_style_partitioning 'true', " +
" hoodie.datasource.hive_sync.enable 'true', " +
" hoodie.datasource.hive_sync.mode 'HMS', " +
" hoodie.datasource.hive_sync.sync_comment 'true', " +
" hoodie.datasource.hive_sync.database 'default', " +
" hoodie.datasource.hive_sync.table_properties '" +
tableProperties + "', " +
" hoodie.datasource.hive_sync.table '" + tablename + "', " +
" hoodie.schema.on.read.enable 'true' " +
" ) as select * from merge_source "
);`
input data
`val df =
Seq((9,"qwertyuiop","US","1","pravin","abcd","SFO")).toDF("EventTime","transactionId","Country","storeNbr","FullName","CompanyName","City")`
merge command:
`spark.sql("merge into partial_update_4_rt as target using merge_source as
source on target."+recordkey +" = source."+recordkey+" when matched then update
set City = source.City , CompanyName = source.CompanyName, EventTime =
source.EventTime").show
`
@codope
@ad1happy2go
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]