rangareddy commented on issue #13970:
URL: https://github.com/apache/hudi/issues/13970#issuecomment-3461058429
Hi Team,
I was able to reproduce this issue using the provided sample code using hudi
1.0.2.
```sh
spark-shell \
--jars $HOME/hudi_jars/hudi-spark3.5-bundle_2.12-1.0.2.jar \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
\
--conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
```
```scala
val databaseName = "default"
val tableName = "my_trips_table_102"
val basePath = f"file:///tmp/hudi/$tableName"
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70
,"san_francisco"),
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90
,"san_francisco"),
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"
),
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("hudi").
option("hoodie.table.name", tableName).
option("hoodie.database.name", databaseName).
option("hoodie.datasource.write.table.name", tableName).
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.datasource.write.partitionpath.field", "city").
option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
option("hoodie.metadata.record.index.enable", "true").
option("hoodie.write.record.positions", "true").
option("hoodie.spark.sql.merge.into.partial.updates", "true").
option("hoodie.index.type", "RECORD_INDEX").
option("hoodie.datasource.hive_sync.enable", "true").
option("hoodie.datasource.hive_sync.mode", "hms").
option("hoodie.datasource.hive_sync.database", databaseName).
option("hoodie.datasource.hive_sync.table", tableName).
option("hoodie.datasource.hive_sync.partition_fields", "city").
mode("overwrite").
save(basePath)
spark.read.format("hudi").load(basePath).show()
spark.sql(f"SELECT * FROM $databaseName.$tableName").show()
val updatesData =
Seq((1700000000000L, "e96c4396-3fad-413a-a942-4cb36106d721", "rider-C",
"driver-M_UPDATED", 25.00, "san_francisco"),
(1700000000001L, "new-uuid-1111", "rider-Z", "driver-Q", 45.00,
"london"))
spark.createDataFrame(updatesData).toDF(columns:_*).
createOrReplaceTempView("updates_source")
spark.sql(f"SELECT * FROM $tableName").show()
val mergeSql =
s"""
MERGE INTO default.$tableName AS target
USING updates_source AS source
ON target.uuid = source.uuid
WHEN MATCHED THEN
UPDATE SET
target.driver = source.driver,
target.fare = source.fare
WHEN NOT MATCHED THEN
INSERT *
"""
spark.sql(mergeSql)
spark.sql(f"SELECT * FROM $tableName").show()
```
The above code fails with the following exception.
```java
25/10/29 16:34:47 ERROR Executor: Exception in task 0.0 in stage 97.0 (TID
499)
org.apache.spark.SparkException: Encountered error while reading file
file:/tmp/hudi/my_trips_table_102/san_francisco/99ea3e6b-0da9-4e93-900d-2345b5a20073-0_0-41-383_20251029162458865.parquet.
Details:
at
org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:864)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:293)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to
java.lang.Long
at java.lang.Long.compareTo(Long.java:54)
at
org.apache.hudi.DefaultSparkRecordMerger.partialMerge(DefaultSparkRecordMerger.java:113)
at
org.apache.hudi.common.table.read.FileGroupRecordBuffer.merge(FileGroupRecordBuffer.java:421)
at
org.apache.hudi.common.table.read.PositionBasedFileGroupRecordBuffer.hasNextBaseRecord(PositionBasedFileGroupRecordBuffer.java:239)
at
org.apache.hudi.common.table.read.KeyBasedFileGroupRecordBuffer.doHasNext(KeyBasedFileGroupRecordBuffer.java:134)
at
org.apache.hudi.common.table.read.FileGroupRecordBuffer.hasNext(FileGroupRecordBuffer.java:196)
at
org.apache.hudi.common.table.read.HoodieFileGroupReader.hasNext(HoodieFileGroupReader.java:269)
at
org.apache.hudi.common.table.read.HoodieFileGroupReader$HoodieFileGroupReaderIterator.hasNext(HoodieFileGroupReader.java:341)
at
org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat$$anon$1.hasNext(HoodieFileGroupReaderBasedParquetFileFormat.scala:260)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:283)
... 22 more
```
To resolve this issue, you must explicitly include the precombine field (the
table's ordering key) in the **UPDATE SET** clause of the **MERGE INTO**
statement.
```scala
val mergeSql =
s"""
MERGE INTO default.$tableName AS target
USING updates_source AS source
ON target.uuid = source.uuid
WHEN MATCHED THEN
UPDATE SET
target.ts = source.ts,
target.driver = source.driver,
target.fare = source.fare
WHEN NOT MATCHED THEN
INSERT *
"""
spark.sql(mergeSql)
```
With the latest code, omitting the precombine field (the table's ordering
key) from the MERGE INTO statement will prevent execution and immediately throw
the following exception.
```scala
scala> spark.sql(mergeSQL)
org.apache.spark.sql.hudi.command.MergeIntoFieldResolutionException: MERGE
INTO field resolution error: No matching assignment found for target table
ordering field `ts`
at
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand$.$anonfun$validateTargetTableAttrExistsInAssignments$1(MergeIntoHoodieTableCommand.scala:1040)
at
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand$.$anonfun$validateTargetTableAttrExistsInAssignments$1$adapted(MergeIntoHoodieTableCommand.scala:1031)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand$.validateTargetTableAttrExistsInAssignments(MergeIntoHoodieTableCommand.scala:1031)
at
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.$anonfun$checkUpdatingActions$2(MergeIntoHoodieTableCommand.scala:950)
at
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.$anonfun$checkUpdatingActions$2$adapted(MergeIntoHoodieTableCommand.scala:944)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.checkUpdatingActions(MergeIntoHoodieTableCommand.scala:944)
at
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.validate(MergeIntoHoodieTableCommand.scala:832)
at
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:274)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:691)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
... 47 elided
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]