Hans-Raintree opened a new issue, #9987:
URL: https://github.com/apache/hudi/issues/9987

   **Describe the problem you faced**
   
   When reading incrementally with format 'cdc' the read fails when there was 
both an insert and a delete in the last write for a new partition. Also fails 
if there was just a delete.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   ```
   output_path = '<output path>'
   
   hudiOptions = {
       'hoodie.table.name': 'test',
       'hoodie.datasource.write.recordkey.field': '_id',
       'hoodie.datasource.write.precombine.field': 'replicadmstimestamp',
       'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.ComplexKeyGenerator', 
       'hoodie.datasource.write.partitionpath.field': 'partition',
       'hoodie.datasource.write.payload.class': 
'org.apache.hudi.common.model.AWSDmsAvroPayload',
       'hoodie.table.cdc.enabled': 'true',
       'hoodie.table.cdc.supplemental.logging.mode': 'data_before_after'
   }
   
   data = [("1", "I", "2023-06-14 15:46:06.953746", "A", "A")]
   df = spark.createDataFrame(data, ["_id", "Op", "replicadmstimestamp", 
"code", "partition"])
   
   
   df.write \
       .format('org.apache.hudi') \
       .option('hoodie.datasource.write.operation', 'upsert') \
       .options(**hudiOptions) \
       .mode('append') \
       .save(output_path)
   
   
   data = [("10", "I", "2023-06-15 15:48:06.953746", "B", "B"),
           ("10", "D", "2023-06-15 15:49:06.953746", "B", "B")]
   df = spark.createDataFrame(data, ["_id", "Op", "replicadmstimestamp", 
"code", "partition"])
   
   df.write \
       .format('org.apache.hudi') \
       .option('hoodie.datasource.write.operation', 'upsert') \
       .options(**hudiOptions) \
       .mode('append') \
       .save(output_path)
   
   read_options = {
       'hoodie.datasource.query.type': 'incremental',
       'hoodie.datasource.read.begin.instanttime': '0',
       'hoodie.datasource.query.incremental.format': 'cdc'
   }
   
   df = spark.read \
           .format('org.apache.hudi') \
           .options(**read_options) \
           .load(output_path)
   df.show()
   ```
   Also if the second insert was just:
   
   ` ("10", "D", "2023-06-15 15:49:06.953746", "B", "B")
   ` 
   It would fail with the same error.
   
   **Expected behavior**
   
   Read doesn't fail.
   
   **Environment Description**
   
   * Hudi version : 0.14.0
   
   * Spark version : 3.4.0
   
   * Hive version : 3.1.3
   
   * Hadoop version : 3.3.3
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   I think the issue is probably here:
   
   ```
   if (WriteOperationType.isDelete(operation) && writeStat.getNumWrites() == 0L
        && writeStat.getNumDeletes() != 0) {
     // This is a delete operation wherein all the records in this file group 
are deleted
     // and no records have been written out a new file.
     // So, we find the previous file that this operation delete from, and 
treat each of
     // records as a deleted one.
     HoodieBaseFile beforeBaseFile = getOrCreateFsView().getBaseFileOn(
          fileGroupId.getPartitionPath(), writeStat.getPrevCommit(), 
fileGroupId.getFileId()
     ).orElseThrow(() ->
          new HoodieIOException("Can not get the previous version of the base 
file")
     );
     FileSlice beforeFileSlice = new FileSlice(fileGroupId, 
writeStat.getPrevCommit(), beforeBaseFile, Collections.emptyList());
     cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_DELETE, new 
ArrayList<>(), Option.of(beforeFileSlice), Option.empty());
   } else if (writeStat.getNumUpdateWrites() == 0L && writeStat.getNumDeletes() 
== 0
        && writeStat.getNumWrites() == writeStat.getNumInserts()) {
     // all the records in this file are new.
     cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_INSERT, path);
   } else {
     throw new HoodieException("There should be a cdc log file.");
   }
   ```
   
   Not sure what exactly is going wrong here, maybe writeStat.getNumDeletes() 
is somehow greater than 0, although it should be 0, because nothing actually 
got deleted.
   
   **Stacktrace**
   
   ```An error was encountered:
   An error occurred while calling o389.showString.
   : org.apache.hudi.exception.HoodieException: There should be a cdc log file.
        at 
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.parseWriteStat(HoodieCDCExtractor.java:276)
        at 
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.lambda$extractCDCFileSplits$1(HoodieCDCExtractor.java:131)
        at java.util.ArrayList.forEach(ArrayList.java:1259)
        at 
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.extractCDCFileSplits(HoodieCDCExtractor.java:126)
        at org.apache.hudi.cdc.CDCRelation.buildScan0(CDCRelation.scala:105)
        at org.apache.hudi.cdc.CDCRelation.buildScan(CDCRelation.scala:87)
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:366)
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:400)
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:456)
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:399)
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:366)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:73)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
        at 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
        at 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:73)
        at 
org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:514)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:171)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:231)
        at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:231)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:230)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:171)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:164)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:184)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:231)
        at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:231)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:230)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:181)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:177)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:316)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:692)
        at 
org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:316)
        at 
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:331)
        at 
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:285)
        at 
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:264)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:116)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4232)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:3205)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:3426)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:286)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:325)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:750)
   
   Traceback (most recent call last):
     File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", 
line 900, in show
       print(self._jdf.showString(n, 20, vertical))
     File 
"/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 
1323, in __call__
       answer, self.gateway_client, self.target_id, self.name)
     File 
"/usr/lib/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", 
line 169, in deco
       return f(*a, **kw)
     File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", 
line 328, in get_return_value
       format(target_id, ".", name), value)
   py4j.protocol.Py4JJavaError: An error occurred while calling o389.showString.
   : org.apache.hudi.exception.HoodieException: There should be a cdc log file.
        at 
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.parseWriteStat(HoodieCDCExtractor.java:276)
        at 
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.lambda$extractCDCFileSplits$1(HoodieCDCExtractor.java:131)
        at java.util.ArrayList.forEach(ArrayList.java:1259)
        at 
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.extractCDCFileSplits(HoodieCDCExtractor.java:126)
        at org.apache.hudi.cdc.CDCRelation.buildScan0(CDCRelation.scala:105)
        at org.apache.hudi.cdc.CDCRelation.buildScan(CDCRelation.scala:87)
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:366)
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:400)
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:456)
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:399)
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:366)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:73)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
        at 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
        at 
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:73)
        at 
org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:514)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:171)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:231)
        at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:231)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:230)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:171)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:164)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:184)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:231)
        at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:231)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:230)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:181)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:177)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:316)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:692)
        at 
org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:316)
        at 
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:331)
        at 
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:285)
        at 
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:264)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:116)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4232)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:3205)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:3426)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:286)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:325)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:750)```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to