bhavya-ganatra opened a new issue, #14114:
URL: https://github.com/apache/hudi/issues/14114
### Describe the problem you faced
I want to have some custom logic for few specific fields, so I have defined
custom payload class:
```
public class AppendableFieldsRecordMerger extends
OverwriteWithLatestAvroPayload
combineAndGetUpdateValue
```
With write configuration:
```
.option("hoodie.datasource.write.payload.class",
"com.xxx.xxxx.AppendableFieldsRecordMerger")
```
Basically, for few fields - list/map/set type, I want appendable behaviour,
for few fields I want a behaviour that If value is non-null then it should not
get overwritten, and for few fields If it is mentioned to mark as null, then
remove those fields in current upsert operation. For rest other fields, I want
behaviour as last write wins.
I was testing this behaviour, and I saw an issue that it requires me to add
the same class in my reader spark job.
```
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3614)
at
org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3613)
at
org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
at
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
at org.apache.spark.sql.Dataset.count(Dataset.scala:3613)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading
log file
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:362)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:220)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:204)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:120)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:77)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:473)
at org.apache.hudi.LogFileIterator$.scanLog(Iterators.scala:411)
at org.apache.hudi.LogFileIterator.<init>(Iterators.scala:111)
at
org.apache.hudi.RecordMergingFileIterator.<init>(Iterators.scala:235)
at
org.apache.hudi.RecordMergingFileIterator.<init>(Iterators.scala:246)
at
org.apache.hudi.RecordMergingFileIterator.<init>(Iterators.scala:251)
at
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:109)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
... 1 more
Caused by: org.apache.hudi.exception.HoodieException: Unable to load class
at
org.apache.hudi.common.util.ReflectionUtils.lambda$getClass$0(ReflectionUtils.java:55)
at
java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
at
org.apache.hudi.common.util.ReflectionUtils.getClass(ReflectionUtils.java:51)
at
org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:98)
at
org.apache.hudi.common.util.SpillableMapUtils.generateEmptyPayload(SpillableMapUtils.java:179)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.processNextDeletedRecord(HoodieMergedLogRecordScanner.java:297)
at
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:1024)
at
java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:663)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:352)
... 32 more
Caused by: java.lang.ClassNotFoundException:
com.concentricai.lhwriter.merge.AppendableFieldsRecordMerger
at
java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:421)
at java.base/java.lang.Class.forName(Class.java:412)
at
org.apache.hudi.common.util.ReflectionUtils.lambda$getClass$0(ReflectionUtils.java:53)
... 41 more
```
I am using MOR Table. But, I am curious, is this expected behavior? I
couldn't find any official documentation mentioning that we will need the same
custom payload class in reader, too.
I am new to this hudi, So apologies for any repeating issue.
hudi version: 0.15
### To Reproduce
1.
2.
3.
4.
### Expected behavior
I was hoping that we don't have to specify the reader class in jar used by
reader job.
### Environment Description
* Hudi version:
* Spark version:
* Flink version:
* Hive version:
* Hadoop version:
* Storage (HDFS/S3/GCS..):
* Running on Docker? (yes/no):
### Additional context
* Hudi version: 0.15
* Spark version: 3.5.1
* Flink version: -
* Hive version:
* Hadoop version: -
* Storage (HDFS/S3/GCS..): Minio
* Running on Docker? (yes/no): yes
### Stacktrace
```shell
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]