zaza opened a new issue, #9032:
URL: https://github.com/apache/hudi/issues/9032

   **Describe the problem you faced**
   
   Reading from a CDC-enabled Hudi table results in a cryptic NPE error as 
shown in the stacktrace section. The error occurs after writing to the table 
with `WriteOperationType.BULK_INSERT`. Writing with `UPSERT` does not cause any 
issues when reading.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Write to Hudi table:
   ```
   ds.write()
                   .format("hudi")
                   //... some options omitted for brevity
                   .option(DataSourceWriteOptions.OPERATION().key(), 
WriteOperationType.BULK_INSERT.name())
                   .option(HoodieTableConfig.NAME.key(), "foo")
                   .option(HoodieTableConfig.CDC_ENABLED.key(), "true")
                   
.option(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key(), 
HoodieCDCSupplementalLoggingMode.data_before_after.name())
                   .mode(Overwrite)
                   .save(...);
   ```
   2. Read from the table
   ```
    spark.readStream()
                       .format("hudi")
                       .option("hoodie.datasource.query.incremental.format", 
"cdc")
                       .option("hoodie.datasource.query.type", "incremental")
                       .load(...)
                       .writeStream()
                       .foreachBatch((batchDF, batchId) -> {
                           batchDF.show(); // FIXME: this line results in an 
NPE 
                       }).start().awaitTermination();
   ```
   
   **Expected behavior**
   
   Expecting same results when reading from the table regardless of the write 
operation used. Expecting "Support all the write operations" as stated in [the 
Design Goals section of the CDC 
RFC](https://github.com/apache/hudi/blob/master/rfc/rfc-51/rfc-51.md#design-goals).
   
   **Environment Description**
   
   * Hudi version : 0.13.1
   
   * Spark version : 3.3.2
   
   * Hive version : n/a
   
   * Hadoop version : 3.3.3
   
   * Storage (HDFS/S3/GCS..) : local file system for testing, S3 in the long run
   
   * Running on Docker? (yes/no) : yes
   
   
   **Stacktrace**
   
   ```23/06/21 22:10:33 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 
times; aborting job
   23/06/21 22:10:33 ERROR MicroBatchExecution: Query [id = 
e4cb7dc2-dba3-403e-8d9d-d5491dc7e8b0, runId = 
1837bba4-981a-4edb-be4d-9d877abf65bd] terminated with error
   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 
3) (172.19.0.3 executor 0): java.lang.NullPointerException: Cannot invoke 
"org.apache.spark.unsafe.types.UTF8String.toString()" because the return value 
of "org.apache.spark.sql.catalyst.InternalRow.getUTF8String(int)" is null
        at 
org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:34)
        at 
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.$anonfun$convertRowToJsonString$1(HoodieCDCRDD.scala:562)
        at 
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.$anonfun$convertRowToJsonString$1$adapted(HoodieCDCRDD.scala:559)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at 
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.convertRowToJsonString(HoodieCDCRDD.scala:559)
        at 
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.loadNext(HoodieCDCRDD.scala:295)
        at 
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNextInternal(HoodieCDCRDD.scala:263)
        at 
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNext(HoodieCDCRDD.scala:284)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
   
   Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
        at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
        at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
        at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
        at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:808)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:767)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:776)
        ...
   Caused by: java.lang.NullPointerException
        at 
org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:34)
        at 
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.$anonfun$convertRowToJsonString$1(HoodieCDCRDD.scala:562)
        at 
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.$anonfun$convertRowToJsonString$1$adapted(HoodieCDCRDD.scala:559)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at 
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.convertRowToJsonString(HoodieCDCRDD.scala:559)
        at 
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.loadNext(HoodieCDCRDD.scala:295)
        at 
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNextInternal(HoodieCDCRDD.scala:263)
        at 
org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNext(HoodieCDCRDD.scala:284)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to