WTa-hash edited a comment on issue #2057: URL: https://github.com/apache/hudi/issues/2057#issuecomment-683774247
After implementing the custom AWSDmsAvroPayload class referenced in https://issues.apache.org/jira/browse/HUDI-802. It seems processing insert + delete for a particular record within the same batch works correctly, but deletes do not work correctly in later batches and errors out. The error is: 20/08/31 13:17:40 WARN TaskSetManager: Lost task 0.0 in stage 184.0 (TID 64740, ip-xxx-xx-x-xxx.ec2.internal, executor 3): org.apache.hudi.exception.HoodieIOException: IOException when reading log file at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:244) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81) at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.compact(HoodieMergeOnReadTableCompactor.java:126) at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.lambda$compact$644ebad7$1(HoodieMergeOnReadTableCompactor.java:98) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1181) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 20/08/31 13:17:40 ERROR TaskSetManager: Task 0 in stage 184.0 failed 4 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 184.0 failed 4 times, most recent failure: Lost task 0.3 in stage 184.0 (TID 64743, ip-xxx-xx-x-xxx.ec2.internal, executor 4): org.apache.hudi.exception.HoodieIOException: IOException when reading log file at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:244) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81) at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.compact(HoodieMergeOnReadTableCompactor.java:126) at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.lambda$compact$644ebad7$1(HoodieMergeOnReadTableCompactor.java:98) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1181) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:778) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at org.apache.spark.rdd.RDD.collect(RDD.scala:989) at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361) at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45) at org.apache.hudi.client.HoodieWriteClient.doCompactionCommit(HoodieWriteClient.java:1123) at org.apache.hudi.client.HoodieWriteClient.commitCompaction(HoodieWriteClient.java:1091) at org.apache.hudi.client.HoodieWriteClient.runCompaction(HoodieWriteClient.java:1074) at org.apache.hudi.client.HoodieWriteClient.compact(HoodieWriteClient.java:1045) at org.apache.hudi.client.HoodieWriteClient.lambda$forceCompact$12(HoodieWriteClient.java:1160) at org.apache.hudi.common.util.Option.ifPresent(Option.java:96) at org.apache.hudi.client.HoodieWriteClient.forceCompact(HoodieWriteClient.java:1157) at org.apache.hudi.client.HoodieWriteClient.postCommit(HoodieWriteClient.java:502) at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:157) at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:101) at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:92) at org.apache.hudi.HoodieSparkSqlWriter$.checkWriteStatus(HoodieSparkSqlWriter.scala:262) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:184) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:108) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:173) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:169) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:197) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:194) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:169) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:114) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:112) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94) at org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141) at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) ... 51 elided Caused by: org.apache.hudi.exception.HoodieIOException: IOException when reading log file at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:244) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81) at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.compact(HoodieMergeOnReadTableCompactor.java:126) at org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.lambda$compact$644ebad7$1(HoodieMergeOnReadTableCompactor.java:98) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1181) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) This can be reproduced by running the example script from the original post with DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY set to the custom AWSDmsAvroPayload class referenced in https://issues.apache.org/jira/browse/HUDI-802. I do not get the error mentioned above on Copy-On-Write (COW) tables. COW tables seem to process later batches correctly after implementing the custom AWSDmsAvroPayload class. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
