rangareddy commented on issue #13970:
URL: https://github.com/apache/hudi/issues/13970#issuecomment-3461058429

   Hi Team,
   
   I was able to reproduce this issue using the provided sample code using hudi 
1.0.2. 
   
   ```sh
   spark-shell \
   --jars $HOME/hudi_jars/hudi-spark3.5-bundle_2.12-1.0.2.jar \
   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
   --conf 
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
 \
   --conf 
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
   --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
   ```
   
   ```scala
   val databaseName = "default"
   val tableName = "my_trips_table_102"
   val basePath = f"file:///tmp/hudi/$tableName"
   
   val columns = Seq("ts","uuid","rider","driver","fare","city")
   
   val data =
     
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
       
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70
 ,"san_francisco"),
       
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90
 ,"san_francisco"),
       
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"
    ),
       
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
   
   var inserts = spark.createDataFrame(data).toDF(columns:_*)
   
   inserts.write.format("hudi").
     option("hoodie.table.name", tableName).
     option("hoodie.database.name", databaseName).
     option("hoodie.datasource.write.table.name", tableName).
     option("hoodie.datasource.write.recordkey.field", "uuid").
     option("hoodie.datasource.write.precombine.field", "ts").
     option("hoodie.datasource.write.partitionpath.field", "city").
     option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
     option("hoodie.metadata.record.index.enable", "true").
     option("hoodie.write.record.positions", "true").
     option("hoodie.spark.sql.merge.into.partial.updates", "true").
     option("hoodie.index.type", "RECORD_INDEX").
     option("hoodie.datasource.hive_sync.enable", "true").
     option("hoodie.datasource.hive_sync.mode", "hms").
     option("hoodie.datasource.hive_sync.database", databaseName).
     option("hoodie.datasource.hive_sync.table", tableName).
     option("hoodie.datasource.hive_sync.partition_fields", "city").
     mode("overwrite").
     save(basePath)
   
   spark.read.format("hudi").load(basePath).show()
   spark.sql(f"SELECT * FROM $databaseName.$tableName").show()
   
   val updatesData =
     Seq((1700000000000L, "e96c4396-3fad-413a-a942-4cb36106d721", "rider-C", 
"driver-M_UPDATED", 25.00, "san_francisco"),
         (1700000000001L, "new-uuid-1111", "rider-Z", "driver-Q", 45.00, 
"london"))
   
   spark.createDataFrame(updatesData).toDF(columns:_*).
     createOrReplaceTempView("updates_source")
   
   spark.sql(f"SELECT * FROM $tableName").show()
   
   val mergeSql =
     s"""
     MERGE INTO default.$tableName AS target
     USING updates_source AS source
     ON target.uuid = source.uuid
     WHEN MATCHED THEN
       UPDATE SET
         target.driver = source.driver,
         target.fare = source.fare
     WHEN NOT MATCHED THEN
       INSERT *
     """
   
   spark.sql(mergeSql)
   
   spark.sql(f"SELECT * FROM $tableName").show()
   ```
   
   The above code fails with the following exception.
   
   ```java
   25/10/29 16:34:47 ERROR Executor: Exception in task 0.0 in stage 97.0 (TID 
499)
   org.apache.spark.SparkException: Encountered error while reading file 
file:/tmp/hudi/my_trips_table_102/san_francisco/99ea3e6b-0da9-4e93-900d-2345b5a20073-0_0-41-383_20251029162458865.parquet.
 Details:
        at 
org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:864)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:293)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to 
java.lang.Long
        at java.lang.Long.compareTo(Long.java:54)
        at 
org.apache.hudi.DefaultSparkRecordMerger.partialMerge(DefaultSparkRecordMerger.java:113)
        at 
org.apache.hudi.common.table.read.FileGroupRecordBuffer.merge(FileGroupRecordBuffer.java:421)
        at 
org.apache.hudi.common.table.read.PositionBasedFileGroupRecordBuffer.hasNextBaseRecord(PositionBasedFileGroupRecordBuffer.java:239)
        at 
org.apache.hudi.common.table.read.KeyBasedFileGroupRecordBuffer.doHasNext(KeyBasedFileGroupRecordBuffer.java:134)
        at 
org.apache.hudi.common.table.read.FileGroupRecordBuffer.hasNext(FileGroupRecordBuffer.java:196)
        at 
org.apache.hudi.common.table.read.HoodieFileGroupReader.hasNext(HoodieFileGroupReader.java:269)
        at 
org.apache.hudi.common.table.read.HoodieFileGroupReader$HoodieFileGroupReaderIterator.hasNext(HoodieFileGroupReader.java:341)
        at 
org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat$$anon$1.hasNext(HoodieFileGroupReaderBasedParquetFileFormat.scala:260)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:283)
        ... 22 more
   ```
   
   To resolve this issue, you must explicitly include the precombine field (the 
table's ordering key) in the **UPDATE SET** clause of the **MERGE INTO** 
statement.
   
   ```scala
   val mergeSql =
     s"""
     MERGE INTO default.$tableName AS target
     USING updates_source AS source
     ON target.uuid = source.uuid
     WHEN MATCHED THEN
       UPDATE SET
         target.ts = source.ts,
         target.driver = source.driver,
         target.fare = source.fare
     WHEN NOT MATCHED THEN
       INSERT *
     """
   
   spark.sql(mergeSql)
   ```
   
   With the latest code, omitting the precombine field (the table's ordering 
key) from the MERGE INTO statement will prevent execution and immediately throw 
the following exception.
   
   ```scala
   scala> spark.sql(mergeSQL)
   org.apache.spark.sql.hudi.command.MergeIntoFieldResolutionException: MERGE 
INTO field resolution error: No matching assignment found for target table 
ordering field `ts`
     at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand$.$anonfun$validateTargetTableAttrExistsInAssignments$1(MergeIntoHoodieTableCommand.scala:1040)
     at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand$.$anonfun$validateTargetTableAttrExistsInAssignments$1$adapted(MergeIntoHoodieTableCommand.scala:1031)
     at scala.collection.Iterator.foreach(Iterator.scala:943)
     at scala.collection.Iterator.foreach$(Iterator.scala:943)
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
     at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand$.validateTargetTableAttrExistsInAssignments(MergeIntoHoodieTableCommand.scala:1031)
     at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.$anonfun$checkUpdatingActions$2(MergeIntoHoodieTableCommand.scala:950)
     at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.$anonfun$checkUpdatingActions$2$adapted(MergeIntoHoodieTableCommand.scala:944)
     at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
     at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
     at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.checkUpdatingActions(MergeIntoHoodieTableCommand.scala:944)
     at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.validate(MergeIntoHoodieTableCommand.scala:832)
     at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:274)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
     at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
     at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
     at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
     at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
     at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
     at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
     at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
     at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
     at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
     at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
     at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
     at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
     at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
     at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
     at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
     at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
     at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
     at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
     at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
     at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
     at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
     at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
     at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:691)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
     at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
     at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
     at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
     ... 47 elided
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to