zaza opened a new issue, #9032:
URL: https://github.com/apache/hudi/issues/9032
**Describe the problem you faced**
Reading from a CDC-enabled Hudi table results in a cryptic NPE error as
shown in the stacktrace section. The error occurs after writing to the table
with `WriteOperationType.BULK_INSERT`. Writing with `UPSERT` does not cause any
issues when reading.
**To Reproduce**
Steps to reproduce the behavior:
1. Write to Hudi table:
```
ds.write()
.format("hudi")
//... some options omitted for brevity
.option(DataSourceWriteOptions.OPERATION().key(),
WriteOperationType.BULK_INSERT.name())
.option(HoodieTableConfig.NAME.key(), "foo")
.option(HoodieTableConfig.CDC_ENABLED.key(), "true")
.option(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key(),
HoodieCDCSupplementalLoggingMode.data_before_after.name())
.mode(Overwrite)
.save(...);
```
2. Read from the table
```
spark.readStream()
.format("hudi")
.option("hoodie.datasource.query.incremental.format",
"cdc")
.option("hoodie.datasource.query.type", "incremental")
.load(...)
.writeStream()
.foreachBatch((batchDF, batchId) -> {
batchDF.show(); // FIXME: this line results in an
NPE
}).start().awaitTermination();
```
**Expected behavior**
Expecting same results when reading from the table regardless of the write
operation used. Expecting "Support all the write operations" as stated in [the
Design Goals section of the CDC
RFC](https://github.com/apache/hudi/blob/master/rfc/rfc-51/rfc-51.md#design-goals).
**Environment Description**
* Hudi version : 0.13.1
* Spark version : 3.3.2
* Hive version : n/a
* Hadoop version : 3.3.3
* Storage (HDFS/S3/GCS..) : local file system for testing, S3 in the long run
* Running on Docker? (yes/no) : yes
**Stacktrace**
```23/06/21 22:10:33 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4
times; aborting job
23/06/21 22:10:33 ERROR MicroBatchExecution: Query [id =
e4cb7dc2-dba3-403e-8d9d-d5491dc7e8b0, runId =
1837bba4-981a-4edb-be4d-9d877abf65bd] terminated with error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID
3) (172.19.0.3 executor 0): java.lang.NullPointerException: Cannot invoke
"org.apache.spark.unsafe.types.UTF8String.toString()" because the return value
of "org.apache.spark.sql.catalyst.InternalRow.getUTF8String(int)" is null
at
org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:34)
at
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.$anonfun$convertRowToJsonString$1(HoodieCDCRDD.scala:562)
at
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.$anonfun$convertRowToJsonString$1$adapted(HoodieCDCRDD.scala:559)
at scala.collection.immutable.List.foreach(List.scala:431)
at
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.convertRowToJsonString(HoodieCDCRDD.scala:559)
at
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.loadNext(HoodieCDCRDD.scala:295)
at
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNextInternal(HoodieCDCRDD.scala:263)
at
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNext(HoodieCDCRDD.scala:284)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
at
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
at
org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
at org.apache.spark.sql.Dataset.show(Dataset.scala:808)
at org.apache.spark.sql.Dataset.show(Dataset.scala:767)
at org.apache.spark.sql.Dataset.show(Dataset.scala:776)
...
Caused by: java.lang.NullPointerException
at
org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:34)
at
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.$anonfun$convertRowToJsonString$1(HoodieCDCRDD.scala:562)
at
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.$anonfun$convertRowToJsonString$1$adapted(HoodieCDCRDD.scala:559)
at scala.collection.immutable.List.foreach(List.scala:431)
at
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.convertRowToJsonString(HoodieCDCRDD.scala:559)
at
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.loadNext(HoodieCDCRDD.scala:295)
at
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNextInternal(HoodieCDCRDD.scala:263)
at
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNext(HoodieCDCRDD.scala:284)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]