Hans-Raintree opened a new issue, #9987:
URL: https://github.com/apache/hudi/issues/9987
**Describe the problem you faced**
When reading incrementally with format 'cdc' the read fails when there was
both an insert and a delete in the last write for a new partition. Also fails
if there was just a delete.
**To Reproduce**
Steps to reproduce the behavior:
```
output_path = '<output path>'
hudiOptions = {
'hoodie.table.name': 'test',
'hoodie.datasource.write.recordkey.field': '_id',
'hoodie.datasource.write.precombine.field': 'replicadmstimestamp',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.write.partitionpath.field': 'partition',
'hoodie.datasource.write.payload.class':
'org.apache.hudi.common.model.AWSDmsAvroPayload',
'hoodie.table.cdc.enabled': 'true',
'hoodie.table.cdc.supplemental.logging.mode': 'data_before_after'
}
data = [("1", "I", "2023-06-14 15:46:06.953746", "A", "A")]
df = spark.createDataFrame(data, ["_id", "Op", "replicadmstimestamp",
"code", "partition"])
df.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'upsert') \
.options(**hudiOptions) \
.mode('append') \
.save(output_path)
data = [("10", "I", "2023-06-15 15:48:06.953746", "B", "B"),
("10", "D", "2023-06-15 15:49:06.953746", "B", "B")]
df = spark.createDataFrame(data, ["_id", "Op", "replicadmstimestamp",
"code", "partition"])
df.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'upsert') \
.options(**hudiOptions) \
.mode('append') \
.save(output_path)
read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': '0',
'hoodie.datasource.query.incremental.format': 'cdc'
}
df = spark.read \
.format('org.apache.hudi') \
.options(**read_options) \
.load(output_path)
df.show()
```
Also if the second insert was just:
` ("10", "D", "2023-06-15 15:49:06.953746", "B", "B")
`
It would fail with the same error.
**Expected behavior**
Read doesn't fail.
**Environment Description**
* Hudi version : 0.14.0
* Spark version : 3.4.0
* Hive version : 3.1.3
* Hadoop version : 3.3.3
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : no
**Additional context**
I think the issue is probably here:
```
if (WriteOperationType.isDelete(operation) && writeStat.getNumWrites() == 0L
&& writeStat.getNumDeletes() != 0) {
// This is a delete operation wherein all the records in this file group
are deleted
// and no records have been written out a new file.
// So, we find the previous file that this operation delete from, and
treat each of
// records as a deleted one.
HoodieBaseFile beforeBaseFile = getOrCreateFsView().getBaseFileOn(
fileGroupId.getPartitionPath(), writeStat.getPrevCommit(),
fileGroupId.getFileId()
).orElseThrow(() ->
new HoodieIOException("Can not get the previous version of the base
file")
);
FileSlice beforeFileSlice = new FileSlice(fileGroupId,
writeStat.getPrevCommit(), beforeBaseFile, Collections.emptyList());
cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_DELETE, new
ArrayList<>(), Option.of(beforeFileSlice), Option.empty());
} else if (writeStat.getNumUpdateWrites() == 0L && writeStat.getNumDeletes()
== 0
&& writeStat.getNumWrites() == writeStat.getNumInserts()) {
// all the records in this file are new.
cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_INSERT, path);
} else {
throw new HoodieException("There should be a cdc log file.");
}
```
Not sure what exactly is going wrong here, maybe writeStat.getNumDeletes()
is somehow greater than 0, although it should be 0, because nothing actually
got deleted.
**Stacktrace**
```An error was encountered:
An error occurred while calling o389.showString.
: org.apache.hudi.exception.HoodieException: There should be a cdc log file.
at
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.parseWriteStat(HoodieCDCExtractor.java:276)
at
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.lambda$extractCDCFileSplits$1(HoodieCDCExtractor.java:131)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.extractCDCFileSplits(HoodieCDCExtractor.java:126)
at org.apache.hudi.cdc.CDCRelation.buildScan0(CDCRelation.scala:105)
at org.apache.hudi.cdc.CDCRelation.buildScan(CDCRelation.scala:87)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:366)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:400)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:456)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:399)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:366)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:73)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:73)
at
org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:514)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:171)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:231)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:231)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:230)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:171)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:164)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:184)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:231)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:231)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:230)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:181)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:177)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:316)
at
org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:692)
at
org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:316)
at
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:331)
at
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:285)
at
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:264)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:116)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4232)
at org.apache.spark.sql.Dataset.head(Dataset.scala:3205)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3426)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:286)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:325)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
line 900, in show
print(self._jdf.showString(n, 20, vertical))
File
"/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line
1323, in __call__
answer, self.gateway_client, self.target_id, self.name)
File
"/usr/lib/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py",
line 169, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py",
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o389.showString.
: org.apache.hudi.exception.HoodieException: There should be a cdc log file.
at
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.parseWriteStat(HoodieCDCExtractor.java:276)
at
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.lambda$extractCDCFileSplits$1(HoodieCDCExtractor.java:131)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at
org.apache.hudi.common.table.cdc.HoodieCDCExtractor.extractCDCFileSplits(HoodieCDCExtractor.java:126)
at org.apache.hudi.cdc.CDCRelation.buildScan0(CDCRelation.scala:105)
at org.apache.hudi.cdc.CDCRelation.buildScan(CDCRelation.scala:87)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:366)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:400)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:456)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:399)
at
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:366)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:73)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at
scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at
org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:73)
at
org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:514)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:171)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:231)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:231)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:230)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:171)
at
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:164)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:184)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:231)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:231)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:230)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:181)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:177)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:316)
at
org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:692)
at
org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:316)
at
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:331)
at
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:285)
at
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:264)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:116)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4232)
at org.apache.spark.sql.Dataset.head(Dataset.scala:3205)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3426)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:286)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:325)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]