bhavya-ganatra opened a new issue, #14114:
URL: https://github.com/apache/hudi/issues/14114

   ### Describe the problem you faced
   
   I want to have some custom logic for few specific fields, so I have defined 
custom payload class:
   
   ```
   public class AppendableFieldsRecordMerger extends 
OverwriteWithLatestAvroPayload
   
   combineAndGetUpdateValue
   ```
   
   With write configuration:
   ```
   .option("hoodie.datasource.write.payload.class", 
"com.xxx.xxxx.AppendableFieldsRecordMerger")
   ```
   Basically, for few fields - list/map/set type, I want appendable behaviour, 
for few fields I want a behaviour that If value is non-null then it should not 
get overwritten, and for few fields If it is mentioned to mark as null, then 
remove those fields in current upsert operation. For rest other fields, I want 
behaviour as last write wins.
   
   I was testing this behaviour, and I saw an issue that it requires me to add 
the same class in my reader spark job.
   
   ```
   Driver stacktrace:
           at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
           at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
           at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
           at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
           at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
           at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
           at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
           at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
           at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
           at scala.Option.foreach(Option.scala:407)
           at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
           at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
           at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
           at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
           at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
           at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
           at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
           at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
           at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
           at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
           at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
           at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
           at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3614)
           at 
org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3613)
           at 
org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
           at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
           at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
           at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
           at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
           at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
           at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
           at org.apache.spark.sql.Dataset.count(Dataset.scala:3613)
           at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
           at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
           at java.base/java.lang.reflect.Method.invoke(Method.java:580)
           at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
           at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
           at py4j.Gateway.invoke(Gateway.java:282)
           at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
           at py4j.commands.CallCommand.execute(CallCommand.java:79)
           at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
           at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
           at java.base/java.lang.Thread.run(Thread.java:1583)
   Caused by: org.apache.hudi.exception.HoodieException: Exception when reading 
log file 
           at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:362)
           at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:220)
           at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:204)
           at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:120)
           at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:77)
           at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:473)
           at org.apache.hudi.LogFileIterator$.scanLog(Iterators.scala:411)
           at org.apache.hudi.LogFileIterator.<init>(Iterators.scala:111)
           at 
org.apache.hudi.RecordMergingFileIterator.<init>(Iterators.scala:235)
           at 
org.apache.hudi.RecordMergingFileIterator.<init>(Iterators.scala:246)
           at 
org.apache.hudi.RecordMergingFileIterator.<init>(Iterators.scala:251)
           at 
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:109)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
           at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
           at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
           at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
           at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
           at org.apache.spark.scheduler.Task.run(Task.scala:141)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
           ... 1 more
   Caused by: org.apache.hudi.exception.HoodieException: Unable to load class
           at 
org.apache.hudi.common.util.ReflectionUtils.lambda$getClass$0(ReflectionUtils.java:55)
           at 
java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
           at 
org.apache.hudi.common.util.ReflectionUtils.getClass(ReflectionUtils.java:51)
           at 
org.apache.hudi.common.util.HoodieRecordUtils.loadPayload(HoodieRecordUtils.java:98)
           at 
org.apache.hudi.common.util.SpillableMapUtils.generateEmptyPayload(SpillableMapUtils.java:179)
           at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.processNextDeletedRecord(HoodieMergedLogRecordScanner.java:297)
           at 
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:1024)
           at 
java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
           at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:663)
           at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:352)
           ... 32 more
   Caused by: java.lang.ClassNotFoundException: 
com.concentricai.lhwriter.merge.AppendableFieldsRecordMerger
           at 
java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
           at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
           at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
           at java.base/java.lang.Class.forName0(Native Method)
           at java.base/java.lang.Class.forName(Class.java:421)
           at java.base/java.lang.Class.forName(Class.java:412)
           at 
org.apache.hudi.common.util.ReflectionUtils.lambda$getClass$0(ReflectionUtils.java:53)
           ... 41 more
   ```
   
   I am using MOR Table. But, I am curious, is this expected behavior? I 
couldn't find any official documentation mentioning that we will need the same 
custom payload class in reader, too. 
   
   I am new to this hudi, So apologies for any repeating issue. 
   
   hudi version: 0.15
   
   
   ### To Reproduce
   
   1.
   2.
   3.
   4.
   
   
   ### Expected behavior
   
   I was hoping that we don't have to specify the reader class in jar used by 
reader job. 
   
   
   ### Environment Description
   
   * Hudi version:
   * Spark version:
   * Flink version:
   * Hive version:
   * Hadoop version:
   * Storage (HDFS/S3/GCS..):
   * Running on Docker? (yes/no):
   
   
   ### Additional context
   
   * Hudi version: 0.15
   * Spark version: 3.5.1
   * Flink version: -
   * Hive version: 
   * Hadoop version: - 
   * Storage (HDFS/S3/GCS..): Minio
   * Running on Docker? (yes/no): yes
   
   ### Stacktrace
   
   ```shell
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to