RameshkumarChikoti123 opened a new issue, #12191:
URL: https://github.com/apache/hudi/issues/12191
I am using RLI configuration and while upserting and deleting getting error
as describe below
**Expected behavior**
it should work end to end with RLI index enable
**Environment Description**
* Hudi version : 0.15.0
* Spark version : 3.3.0
* Storage : S3
* Hive version : NA
* Running on Docker : Yes
* Hadoop version : 3.3.4
**Steps to reproduce the behaviour:**
```
# Spark Configuration
sparkConf.set('spark.sql.catalog.spark_catalog',
'org.apache.spark.sql.hudi.catalog.HoodieCatalog')
sparkConf.set('spark.sql.catalog.spark_catalog.type', 'hadoop')
sparkConf.set('spark.serializer',
'org.apache.spark.serializer.KryoSerializer')
sparkConf.set('spark.sql.hudi.enable', 'true')
sparkConf.set('spark.driver.extraClassPath',
'/home/jovyan/.ivy2/jars/org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar')
sparkConf.set('spark.executor.extraClassPath',
'/home/jovyan/org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar')
# Configuring Hudi-specific options
sparkConf.set('spark.jars.packages',
'org.apache.hudi:hudi-spark3.3-bundle_2.12:0.15.0,org.apache.hadoop:hadoop-aws:3.3.4,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
# Hudi SQL Extensions
sparkConf.set('spark.sql.extensions',
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension')
hudiOptions = {
'hoodie.table.name': 'my_hudi_table',
'hoodie.datasource.write.recordkey.field': 'guid',
'hoodie.datasource.write.precombine.field': 'timestamp',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.metadata.record.index.enable': 'true',
'hoodie.index.type': 'RECORD_INDEX',
'hoodie.datasource.hive_sync.enable': False,
'hoodie.table.type' : 'COPY_ON_WRITE',
'hoodie.metadata.enable' : 'true',
'hoodie.metadata.record.index.enable' : 'true',
'hoodie.combine.before.upsert': 'true',
'hoodie.combine.before.insert': 'true'
}
# Insert into hudi table
from pyspark.sql.types import StructType, StructField, StringType,
TimestampType
data = [("9", "Alice", "2024-01-01 00:00:00", "2024-01"),
("10", "Bob", "2024-01-01 00:01:00", "2024-01"),
("11", "Charlie", "2024-01-02 00:02:00", "2024-01"),
("12", "David", "2024-02-01 00:03:00", "2024-02")]
columns = ["guid", "name", "timestamp", "partitionpath"]
schema = StructType([
StructField("guid", StringType(), True),
StructField("name", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("partitionpath", StringType(), True)
])
inputDF = spark.createDataFrame(data, schema)
inputDF.write.format("hudi") \
.options(**hudiOptions) \
.option("hoodie.datasource.write.operation", "insert") \
.mode("append") \
.save("s3a://bucket/var/hudi_poc/my_hudi_table")
`
# Upsert into hudi table
# Sample DataFrame for upsert
update_data = [("1", "Alice Updated latest ", "2029-01-13 00:00:00",
"2025-11"),
("5", "Eve", "2024-02-02 00:04:00", "2024-02")]
columns = ["guid", "name", "timestamp", "partitionpath"]
updateDF = spark.createDataFrame(update_data, columns)
# Upsert DataFrame into Hudi table
updateDF.write.format("hudi") \
.options(**hudiOptions) \
.option("hoodie.datasource.write.operation", "upsert") \
.mode("append") \
.save("s3a://bucket/var/hudi_poc/my_hudi_table")
# Delete
delete_data = [("1",)]
deleteDF = spark.createDataFrame(delete_data, ["guid"])
deleteDF.write.format("hudi") \
.options(**hudiOptions) \
.option("hoodie.datasource.write.operation", "delete") \
.mode("append") \
.save("s3a://bucket/var/hudi_poc/my_hudi_table")
```
**Additional context**
This error occurs intermittently. I added the hudi spark bundle jar to the
executor and driver classpath based on a GitHub issue
https://github.com/apache/hudi/issues/10609 , but the problem still persists.
**Stacktrace**
```
Py4JJavaError: An error occurred while calling o221.save.
: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for
commit time 20241027192251336
at
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
at
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:109)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:98)
at
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:157)
at
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:225)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:492)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:490)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 7 in stage 56.0 failed 4 times, most recent failure: Lost task
7.3 in stage 56.0 (TID 204) (10.249.127.135 executor 2):
org.apache.hudi.exception.HoodieException: Error occurs when executing map
at
org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at
java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:960)
at
java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:934)
at
java.base/java.util.stream.AbstractTask.compute(AbstractTask.java:327)
at
java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
at
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.helpComplete(ForkJoinPool.java:1223)
at
java.base/java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1915)
at
java.base/java.util.concurrent.ForkJoinTask.awaitDone(ForkJoinTask.java:433)
at
java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:687)
at
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:927)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at
org.apache.hudi.common.engine.HoodieLocalEngineContext.map(HoodieLocalEngineContext.java:82)
at
org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:280)
at
org.apache.hudi.metadata.BaseTableMetadata.readRecordIndex(BaseTableMetadata.java:303)
at
org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:170)
at
org.apache.hudi.index.SparkMetadataTableRecordIndex$RecordIndexFileGroupLookupFunction.call(SparkMetadataTableRecordIndex.java:157)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsToPair$1(JavaRDDLike.scala:186)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading
log file
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:362)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:220)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.scanByFullKeys(HoodieMergedLogRecordScanner.java:163)
at
org.apache.hudi.metadata.HoodieMetadataLogRecordReader.getRecordsByKeys(HoodieMetadataLogRecordReader.java:109)
at
org.apache.hudi.metadata.HoodieBackedTableMetadata.readLogRecords(HoodieBackedTableMetadata.java:337)
at
org.apache.hudi.metadata.HoodieBackedTableMetadata.lookupKeysFromFileSlice(HoodieBackedTableMetadata.java:314)
at
org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$f9381e22$1(HoodieBackedTableMetadata.java:285)
at
org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
... 37 more
Caused by: java.lang.ClassCastException: class
org.apache.avro.generic.GenericData$Record cannot be cast to class
org.apache.hudi.avro.model.HoodieDeleteRecordList
(org.apache.avro.generic.GenericData$Record is in unnamed module of loader
'app'; org.apache.hudi.avro.model.HoodieDeleteRecordList is in unnamed module
of loader org.apache.spark.util.MutableURLClassLoader @799a7938)
at
org.apache.hudi.common.table.log.block.HoodieDeleteBlock.deserialize(HoodieDeleteBlock.java:169)
at
org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:124)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:663)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:352)
... 44 more ```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]