Hans-Raintree opened a new issue, #8968:
URL: https://github.com/apache/hudi/issues/8968
**Describe the problem you faced**
Deltastreamer ingest fails when trying to upsert deletes that do not exist
in the hudi dataset.
**To Reproduce**
Steps to reproduce the behavior:
```
data = [("1", "I", "2023-06-14 15:46:06.953746", "A"),
("2", "I", "2023-06-14 15:46:07.953746", "B"),
("3", "I", "2023-06-14 15:46:08.953746", "C")]
df = spark.createDataFrame(data, ["_id", "Op", "replicadmstimestamp",
"code"])
hudiOptions = {
'hoodie.table.name': 'test',
'hoodie.datasource.write.recordkey.field': '_id',
'hoodie.datasource.write.precombine.field': 'replicadmstimestamp',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.NonpartitionedKeyGenerator',
'hoodie.datasource.write.partitionpath.field': '',
'hoodie.datasource.write.payload.class':
'org.apache.hudi.common.model.AWSDmsAvroPayload',
'hoodie.table.cdc.enabled': 'true',
'hoodie.table.cdc.supplemental.logging.mode': 'data_before_after'
}
df.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'upsert') \
.options(**hudiOptions) \
.mode('append') \
.save(output_path)
data = [("8", "D", "2023-06-14 15:47:09.953746", "B")]
df = spark.createDataFrame(data, ["_id", "Op", "replicadmstimestamp",
"code"])
df.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'upsert') \
.options(**hudiOptions) \
.mode('append') \
.save(output_path)
```
**Expected behavior**
Upsert doesn't fail, either doesn't get written into the CDC log or before
is taken from the input instead.
**Environment Description**
* Hudi version : 0.13.1
* Spark version : 3.3.1
* Hive version : 3.1.3
* Hadoop version : 3.3.3
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : no
**Additional context**
Happens both in AWS EMR and when I tested locally.
**Stacktrace**
```
Caused by: java.util.NoSuchElementException: No value present in Option
at org.apache.hudi.common.util.Option.get(Option.java:89)
at
org.apache.hudi.common.model.AWSDmsAvroPayload.getInsertValue(AWSDmsAvroPayload.java:78)
at
org.apache.hudi.common.model.AWSDmsAvroPayload.getInsertValue(AWSDmsAvroPayload.java:73)
at
org.apache.hudi.common.model.HoodieAvroRecord.toIndexedRecord(HoodieAvroRecord.java:210)
at
org.apache.hudi.io.HoodieMergeHandleWithChangeLog.writeInsertRecord(HoodieMergeHandleWithChangeLog.java:106)
at
org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:397)
at
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:405)
at
org.apache.hudi.io.HoodieMergeHandleWithChangeLog.close(HoodieMergeHandleWithChangeLog.java:112)
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:168)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:372)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:363)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
... 28 more
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
at org.apache.spark.rdd.RDD.count(RDD.scala:1274)
at
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:945)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:381)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]