bibhu107 opened a new issue, #11697:
URL: https://github.com/apache/hudi/issues/11697

   I'm encountering a ClassCastException when using Record Level Indexing (RLI) 
with Hudi 0.14.1 and Spark 3.4. The issue occurs on the second run of my script 
after restarting the Spark shell.
   
   Steps to reproduce:
   1. Start Spark shell with Hudi 0.14.1 with below command
   2. Run the attached script (performs two upserts)
   3. Close the Spark shell
   4. Restart the Spark shell
   5. Run the same script again
   
   
   ```
   export SPARK_VERSION=3.4
   $SPARK_HOME/bin/spark-shell --packages 
org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.14.1 \
   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
   --conf 
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
 \
   --conf 
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
   --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar
   ```
   Simple upsert Script using RecordLevelIndexing
   ```
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.common.config.HoodieMetadataConfig
   import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
   import org.apache.spark.sql.types._
   import org.apache.spark.sql.{DataFrame, Row, SparkSession}
   
   import java.sql.Timestamp
   import java.text.SimpleDateFormat
   
   object HudiRLIMain {
     def main(args: Array[String]): Unit = {
       val spark = SparkSession.builder()
         .appName("Hudi RLI Test")
         .master("local[*]")
         .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
         .config("spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
         .config("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
         .config("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar")
         .getOrCreate()
   
       val HUDI_DATA_FORMAT = "org.apache.hudi"
       val basePath = "/Users/SparkProject/HudiRLITesting/outputpath"
       val outputTableName = "Idempotent"
       val primaryKey = "lookupKey"
       val SIMPLE_INDEX = "SIMPLE"
       val RECORD_LEVEL_INDEX = "RECORD_INDEX"
       try {
         val hudiOptions = Map[String, String](
           RECORDKEY_FIELD_OPT_KEY -> primaryKey,
           PRECOMBINE_FIELD_OPT_KEY -> "booking",
           PARTITIONPATH_FIELD_OPT_KEY -> "Data",
           HoodieIndexConfig.INDEX_TYPE_PROP -> "RE",
           HoodieWriteConfig.TABLE_NAME -> outputTableName,
           HoodieIndexConfig.INDEX_TYPE_PROP -> RECORD_LEVEL_INDEX,
           HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key() -> "true",
         )
   
         def writeToHudi(df: DataFrame): Unit = {
           df.write.format(HUDI_DATA_FORMAT)
             .options(hudiOptions)
             .mode("append")
             .save(basePath)
         }
   
         def readFromHudi(): DataFrame = {
           spark.read.format(HUDI_DATA_FORMAT)
             .option("hoodie.datasource.query.type", "snapshot")
             .load(s"$basePath/*")
         }
   
         val df1 = readDummyPaths(spark)
         writeToHudi(df1)
         println("First Time Insertion Done! Now We gonna Read the Table")
         readFromHudi().show(10, false)
   
         val df2 = readMoreDummyPaths(spark)
         writeToHudi(df2)
         println("Updated Successfully! Now We gonna Read the Table")
         readFromHudi().show(10, false)
   
       } finally {
         spark.stop()
       }
     }
   
   
     private val schema = StructType(Seq(
       StructField("lookupKey", StringType, nullable = false),
       StructField("row_id", StringType, nullable = false),
       StructField("Name", StringType, nullable = true),
       StructField("Salary", LongType, nullable = true),
       StructField("booking", TimestampType, nullable = true)
     ))
   
     private def createDataFrame(spark: SparkSession, data: Seq[Row]): 
DataFrame = {
       spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
     }
   
     def readDummyPaths(spark: SparkSession): DataFrame = {
       val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
       val data = Seq(
         Row("1", "111", "mayawei", 32232L, new 
Timestamp(dateFormat.parse("2024-04-03T09:04:21.786Z").getTime)),
         Row("2", "222", "jaya", 23232L, new 
Timestamp(dateFormat.parse("2024-04-03T09:04:21.786Z").getTime)),
         Row("3", "333", "pritya", 232332L, new 
Timestamp(dateFormat.parse("2024-04-03T09:04:21.786Z").getTime))
       )
       createDataFrame(spark, data)
     }
   
     def readMoreDummyPaths(spark: SparkSession): DataFrame = {
       val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
       val data = Seq(
         Row("1", "112", "mayawei", 32232L, new 
Timestamp(dateFormat.parse("2024-04-03T09:04:21.786Z").getTime)),
         Row("2", "223", "jaya", 23232L, new 
Timestamp(dateFormat.parse("2024-04-03T09:04:21.786Z").getTime)),
         Row("3", "334", "pritya", 232332L, new 
Timestamp(dateFormat.parse("2024-04-03T09:04:21.786Z").getTime))
       )
       createDataFrame(spark, data)
     }
   }
   ```
   
   Issue:
   1. For the first time when Script runs , I am upserting twice and its 
working correctly 
   2. In Second run when I close the shell and re-start and run the same 
command it fails with below error
   
   
   
   ```
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
        at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
        at 
org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:84)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:270)
        at 
org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:296)
        at 
org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170)
        at 
org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:856)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:856)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
   Caused by: org.apache.hudi.exception.HoodieException: Exception when reading 
log file 
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:414)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:220)
        at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.scanByFullKeys(HoodieMergedLogRecordScanner.java:160)
        at 
org.apache.hudi.metadata.HoodieMetadataLogRecordReader.getRecordsByKeys(HoodieMetadataLogRecordReader.java:108)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadata.readLogRecords(HoodieBackedTableMetadata.java:327)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:304)
        at 
org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:275)
        at 
org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
        ... 38 more
   Caused by: java.lang.ClassCastException: class 
org.apache.avro.generic.GenericData$Record cannot be cast to class 
org.apache.hudi.avro.model.HoodieDeleteRecordList 
(org.apache.avro.generic.GenericData$Record is in unnamed module of loader 
'app'; org.apache.hudi.avro.model.HoodieDeleteRecordList is in unnamed module 
of loader scala.reflect.internal.util.ScalaClassLoader$URLClassLoader @56b751b1)
        at 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock.deserialize(HoodieDeleteBlock.java:161)
        at 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:116)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:828)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:403)
        ... 45 more
   
   24/07/28 12:42:54 WARN HoodieSparkSqlWriterInternal: Closing write client
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to