amrishlal commented on code in PR #8978:
URL: https://github.com/apache/hudi/pull/8978#discussion_r1247016425
##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java:
##########
@@ -41,13 +42,30 @@ public Option<Pair<HoodieRecord, Schema>>
merge(HoodieRecord older, Schema oldSc
ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecordType.SPARK);
ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecordType.SPARK);
- if (newer.getData() == null) {
- // Delete record
- return Option.empty();
+ if (newer instanceof HoodieSparkRecord) {
+ HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
+ if (newSparkRecord.isDeleted()) {
+ // Delete record
+ return Option.empty();
+ }
+ } else {
+ if (newer.getData() == null) {
Review Comment:
Test case failures occur in `TestMORDataSource` (`testPayloadDelete` for
example) where test cases fail with following exception:
```
1284819 [Executor task launch worker for task 2.0 in stage 107.0 (TID 136)]
ERROR org.apache.spark.executor.Executor [] - Exception in task 2.0 in stage
107.0 (TID 136)
java.lang.ClassCastException: org.apache.hudi.common.model.HoodieEmptyRecord
cannot be cast to org.apache.hudi.common.model.HoodieSparkRecord
at
org.apache.hudi.HoodieSparkRecordMerger.merge(HoodieSparkRecordMerger.java:45)
~[classes/:?]
at org.apache.hudi.RecordMergingFileIterator.merge(Iterators.scala:241)
~[classes/:?]
at
org.apache.hudi.RecordMergingFileIterator.hasNextInternal(Iterators.scala:218)
~[classes/:?]
at
org.apache.hudi.RecordMergingFileIterator.doHasNext(Iterators.scala:203)
~[classes/:?]
at
org.apache.hudi.util.CachingIterator.hasNext(CachingIterator.scala:36)
~[classes/:?]
at
org.apache.hudi.util.CachingIterator.hasNext$(CachingIterator.scala:36)
~[classes/:?]
at org.apache.hudi.LogFileIterator.hasNext(Iterators.scala:61)
~[classes/:?]
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown
Source) ~[?:?]
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source) ~[?:?]
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
~[spark-sql_2.12-3.3.1.jar:3.3.1]
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
~[spark-sql_2.12-3.3.1.jar:3.3.1]
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
~[scala-library-2.12.15.jar:?]
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
~[spark-core_2.12-3.3.1.jar:3.3.1]
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
~[spark-core_2.12-3.3.1.jar:3.3.1]
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
~[spark-core_2.12-3.3.1.jar:3.3.1]
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
~[spark-core_2.12-3.3.1.jar:3.3.1]
at org.apache.spark.scheduler.Task.run(Task.scala:136)
~[spark-core_2.12-3.3.1.jar:3.3.1]
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
~[spark-core_2.12-3.3.1.jar:3.3.1]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
~[spark-core_2.12-3.3.1.jar:3.3.1]
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
~[spark-core_2.12-3.3.1.jar:3.3.1]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_372]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_372]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_372]```
The HoodieEmptyRecord that is leading to `ClassCastException` is being
created in `HoodieMergedLogRecordScanner.java` Line 295
``` // Put the DELETE record
if (recordType == HoodieRecordType.AVRO) {
records.put(key, SpillableMapUtils.generateEmptyPayload(key,
deleteRecord.getPartitionPath(), deleteRecord.getOrderingValue(),
getPayloadClassFQN()));
} else {
HoodieEmptyRecord record = new HoodieEmptyRecord<>(new HoodieKey(key,
deleteRecord.getPartitionPath()), null, deleteRecord.getOrderingValue(),
recordType);
records.put(key, record);
}
```
Based on offline discussion, we decided to continue with `instanceof` check
before casting to `HoodieSparkRecord`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]