[ 
https://issues.apache.org/jira/browse/HUDI-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-3544:
---------------------------------
    Labels: pull-request-available  (was: )

> Reading from Metadata table fails w/ NPE
> ----------------------------------------
>
>                 Key: HUDI-3544
>                 URL: https://issues.apache.org/jira/browse/HUDI-3544
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: metadata
>            Reporter: sivabalan narayanan
>            Assignee: sivabalan narayanan
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.11.0
>
>
> In one of the prod tables, ran into NullPointerExcpetion when reading from 
> MDT table. We are using one of the latest master commit hash. 
>  
> {code:java}
> 22/03/01 15:23:33 INFO TaskSchedulerImpl: Removed TaskSet 20.0, whose tasks 
> have all completed, from pool 
> 22/03/01 15:23:33 INFO TaskSchedulerImpl: Cancelling stage 20
> 22/03/01 15:23:33 INFO TaskSchedulerImpl: Killing all running tasks in stage 
> 20: Stage cancelled
> 22/03/01 15:23:33 INFO DAGScheduler: ResultStage 20 (collectAsMap at 
> UpsertPartitioner.java:253) failed in 10.901 s due to Job aborted due to 
> stage failure: Task 0 in stage 20.0 failed 4 times, most recent failure: Lost 
> task 0.3 in stage 20.0 (TID 460) (10.0.30.133 executor 1): 
> org.apache.hudi.exception.HoodieMetadataException: Failed to retrieve files 
> in partition 
> s3a://kwabhudi-76437a13-5c90-471b-b6fb-1d362c409e5b/kwabhudi_default/threads 
> from metadata
>       at 
> org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:134)
>       at 
> org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:65)
>       at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$9(AbstractTableFileSystemView.java:304)
>       at 
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
>       at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:295)
>       at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestFileSlicesBeforeOrOn(AbstractTableFileSystemView.java:638)
>       at 
> org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:119)
>       at 
> org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getLatestFileSlicesBeforeOrOn(PriorityBasedFileSystemView.java:182)
>       at 
> org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFileCandidates(SparkUpsertDeltaCommitPartitioner.java:107)
>       at 
> org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFiles(SparkUpsertDeltaCommitPartitioner.java:66)
>       at 
> org.apache.hudi.table.action.commit.UpsertPartitioner.lambda$getSmallFilesForPartitions$f1d92f9e$1(UpsertPartitioner.java:253)
>       at 
> org.apache.spark.api.java.JavaPairRDD$.$anonfun$pairFunToScalaFun$1(JavaPairRDD.scala:1073)
>       at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>       at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
>       at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
>       at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
>       at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
>       at scala.collection.AbstractIterator.to(Iterator.scala:1431)
>       at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
>       at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
>       at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
>       at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
>       at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>       at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
>       at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
>       at 
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>       at org.apache.spark.scheduler.Task.run(Task.scala:131)
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.hudi.exception.HoodieException: Exception when reading 
> log file 
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:335)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:180)
>       at 
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:104)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:71)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:51)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader$Builder.build(HoodieMetadataMergedLogRecordReader.java:246)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:377)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$openReadersIfNeeded$4(HoodieBackedTableMetadata.java:293)
>       at 
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.openReadersIfNeeded(HoodieBackedTableMetadata.java:283)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$0(HoodieBackedTableMetadata.java:139)
>       at java.util.HashMap.forEach(HashMap.java:1290)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:138)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKey(HoodieBackedTableMetadata.java:128)
>       at 
> org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:306)
>       at 
> org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:132)
>       ... 38 more
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:132)
>       at 
> org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:110)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.createHoodieRecord(AbstractHoodieLogRecordReader.java:394)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.createHoodieRecord(HoodieMetadataMergedLogRecordReader.java:96)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:371)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:430)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:326)
>       ... 53 more
> Driver stacktrace:
> 22/03/01 15:23:33 INFO DAGScheduler: Job 17 failed: collectAsMap at 
> UpsertPartitioner.java:253, took 10.903527 s
> 22/03/01 15:23:33 INFO HoodieDeltaStreamer: Delta Sync shutdown. Error ?false
> 22/03/01 15:23:33 WARN HoodieDeltaStreamer: Gracefully shutting down compactor
> 22/03/01 15:23:43 INFO AsyncCompactService: Compactor shutting down properly!!
> 22/03/01 15:23:43 INFO HoodieDeltaStreamer: DeltaSync shutdown. Closing write 
> client. Error?true
> 22/03/01 15:23:43 ERROR HoodieAsyncService: Service shutdown with error
> java.util.concurrent.ExecutionException: 
> org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit 
> time 20220301152244205
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>       at 
> org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
>       at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:182)
>       at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
>       at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:179)
>       at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:530)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>       at 
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
>       at 
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
>       at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
>       at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
>       at 
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
>       at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
>       at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert 
> for commit time 20220301152244205
>       at 
> org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
>       at 
> org.apache.hudi.table.action.deltacommit.SparkInsertDeltaCommitActionExecutor.execute(SparkInsertDeltaCommitActionExecutor.java:47)
>       at 
> org.apache.hudi.table.HoodieSparkMergeOnReadTable.insert(HoodieSparkMergeOnReadTable.java:96)
>       at 
> org.apache.hudi.table.HoodieSparkMergeOnReadTable.insert(HoodieSparkMergeOnReadTable.java:78)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.insert(SparkRDDWriteClient.java:182)
>       at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:574)
>       at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:329)
>       at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:656)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 20.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 20.0 (TID 460) (10.0.30.133 executor 1): 
> org.apache.hudi.exception.HoodieMetadataException: Failed to retrieve files 
> in partition 
> s3a://kwabhudi-76437a13-5c90-471b-b6fb-1d362c409e5b/kwabhudi_default/threads 
> from metadata
>       at 
> org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:134)
>       at 
> org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:65)
>       at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$9(AbstractTableFileSystemView.java:304)
>       at 
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
>       at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:295)
>       at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestFileSlicesBeforeOrOn(AbstractTableFileSystemView.java:638)
>       at 
> org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:119)
>       at 
> org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getLatestFileSlicesBeforeOrOn(PriorityBasedFileSystemView.java:182)
>       at 
> org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFileCandidates(SparkUpsertDeltaCommitPartitioner.java:107)
>       at 
> org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFiles(SparkUpsertDeltaCommitPartitioner.java:66)
>       at 
> org.apache.hudi.table.action.commit.UpsertPartitioner.lambda$getSmallFilesForPartitions$f1d92f9e$1(UpsertPartitioner.java:253)
>       at 
> org.apache.spark.api.java.JavaPairRDD$.$anonfun$pairFunToScalaFun$1(JavaPairRDD.scala:1073)
>       at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>       at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
>       at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
>       at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
>       at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
>       at scala.collection.AbstractIterator.to(Iterator.scala:1431)
>       at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
>       at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
>       at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
>       at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
>       at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>       at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
>       at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
>       at 
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>       at org.apache.spark.scheduler.Task.run(Task.scala:131)
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.hudi.exception.HoodieException: Exception when reading 
> log file 
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:335)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:180)
>       at 
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:104)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:71)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:51)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader$Builder.build(HoodieMetadataMergedLogRecordReader.java:246)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:377)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$openReadersIfNeeded$4(HoodieBackedTableMetadata.java:293)
>       at 
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.openReadersIfNeeded(HoodieBackedTableMetadata.java:283)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$0(HoodieBackedTableMetadata.java:139)
>       at java.util.HashMap.forEach(HashMap.java:1290)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:138)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKey(HoodieBackedTableMetadata.java:128)
>       at 
> org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:306)
>       at 
> org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:132)
>       ... 38 more
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:132)
>       at 
> org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:110)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.createHoodieRecord(AbstractHoodieLogRecordReader.java:394)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.createHoodieRecord(HoodieMetadataMergedLogRecordReader.java:96)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:371)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:430)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:326)
>       ... 53 more
> Driver stacktrace:
>       at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
>       at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
>       at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
>       at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>       at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
>       at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
>       at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
>       at scala.Option.foreach(Option.scala:407)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>       at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
>       at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
>       at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
>       at 
> org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:737)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
>       at 
> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:736)
>       at 
> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:663)
>       at 
> org.apache.hudi.table.action.commit.UpsertPartitioner.getSmallFilesForPartitions(UpsertPartitioner.java:253)
>       at 
> org.apache.hudi.table.action.commit.UpsertPartitioner.assignInserts(UpsertPartitioner.java:158)
>       at 
> org.apache.hudi.table.action.commit.UpsertPartitioner.<init>(UpsertPartitioner.java:94)
>       at 
> org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.<init>(SparkUpsertDeltaCommitPartitioner.java:50)
>       at 
> org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.getUpsertPartitioner(BaseSparkDeltaCommitActionExecutor.java:69)
>       at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getInsertPartitioner(BaseSparkCommitActionExecutor.java:405)
>       at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getPartitioner(BaseSparkCommitActionExecutor.java:227)
>       at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:171)
>       at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:84)
>       at 
> org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:57)
>       ... 11 more
> Caused by: org.apache.hudi.exception.HoodieMetadataException: Failed to 
> retrieve files in partition 
> s3a://kwabhudi-76437a13-5c90-471b-b6fb-1d362c409e5b/kwabhudi_default/threads 
> from metadata
>       at 
> org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:134)
>       at 
> org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:65)
>       at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$9(AbstractTableFileSystemView.java:304)
>       at 
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
>       at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:295)
>       at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestFileSlicesBeforeOrOn(AbstractTableFileSystemView.java:638)
>       at 
> org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:119)
>       at 
> org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getLatestFileSlicesBeforeOrOn(PriorityBasedFileSystemView.java:182)
>       at 
> org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFileCandidates(SparkUpsertDeltaCommitPartitioner.java:107)
>       at 
> org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFiles(SparkUpsertDeltaCommitPartitioner.java:66)
>       at 
> org.apache.hudi.table.action.commit.UpsertPartitioner.lambda$getSmallFilesForPartitions$f1d92f9e$1(UpsertPartitioner.java:253)
>       at 
> org.apache.spark.api.java.JavaPairRDD$.$anonfun$pairFunToScalaFun$1(JavaPairRDD.scala:1073)
>       at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>       at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
>       at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
>       at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
>       at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
>       at scala.collection.AbstractIterator.to(Iterator.scala:1431)
>       at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
>       at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
>       at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
>       at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
>       at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>       at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
>       at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
>       at 
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>       at org.apache.spark.scheduler.Task.run(Task.scala:131)
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
>       ... 3 more
> Caused by: org.apache.hudi.exception.HoodieException: Exception when reading 
> log file 
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:335)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:180)
>       at 
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:104)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:71)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:51)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader$Builder.build(HoodieMetadataMergedLogRecordReader.java:246)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:377)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$openReadersIfNeeded$4(HoodieBackedTableMetadata.java:293)
>       at 
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.openReadersIfNeeded(HoodieBackedTableMetadata.java:283)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$0(HoodieBackedTableMetadata.java:139)
>       at java.util.HashMap.forEach(HashMap.java:1290)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:138)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKey(HoodieBackedTableMetadata.java:128)
>       at 
> org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:306)
>       at 
> org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:132)
>       ... 38 more
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:132)
>       at 
> org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:110)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.createHoodieRecord(AbstractHoodieLogRecordReader.java:394)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.createHoodieRecord(HoodieMetadataMergedLogRecordReader.java:96)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:371)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:430)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:326)
>       ... 53 more
> 22/03/01 15:23:43 INFO DeltaSync: Shutting down embedded timeline server
> 22/03/01 15:23:43 INFO EmbeddedTimelineService: Closing Timeline server
> 22/03/01 15:23:43 INFO TimelineService: Closing Timeline Service
> 22/03/01 15:23:43 INFO Javalin: Stopping Javalin ...
> 22/03/01 15:23:43 ERROR Javalin: Javalin failed to stop gracefully
> java.lang.InterruptedException
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
>       at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
>       at 
> org.apache.hudi.org.eclipse.jetty.server.AbstractConnector.doStop(AbstractConnector.java:333)
>       at 
> org.apache.hudi.org.eclipse.jetty.server.AbstractNetworkConnector.doStop(AbstractNetworkConnector.java:88)
>       at 
> org.apache.hudi.org.eclipse.jetty.server.ServerConnector.doStop(ServerConnector.java:248)
>       at 
> org.apache.hudi.org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
>       at 
> org.apache.hudi.org.eclipse.jetty.server.Server.doStop(Server.java:450)
>       at 
> org.apache.hudi.org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
>       at io.javalin.Javalin.stop(Javalin.java:195)
>       at 
> org.apache.hudi.timeline.service.TimelineService.close(TimelineService.java:325)
>       at 
> org.apache.hudi.client.embedded.EmbeddedTimelineService.stop(EmbeddedTimelineService.java:132)
>       at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.close(DeltaSync.java:886)
>       at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.close(HoodieDeltaStreamer.java:792)
>       at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
>       at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.onDeltaSyncShutdown(HoodieDeltaStreamer.java:215)
>       at 
> org.apache.hudi.async.HoodieAsyncService.lambda$shutdownCallback$0(HoodieAsyncService.java:171)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:750)
> 22/03/01 15:23:43 INFO Javalin: Javalin has stopped
> 22/03/01 15:23:43 INFO TimelineService: Closed Timeline Service
> 22/03/01 15:23:43 INFO EmbeddedTimelineService: Closed Timeline server
> 22/03/01 15:23:43 INFO SparkUI: Stopped Spark web UI at 
> http://ds-job-s3-incr-8f18725f-888f-4aa3-11e82f7f461234c0-driver-svc.kwabhudi.svc:4040
> 22/03/01 15:23:43 INFO KubernetesClusterSchedulerBackend: Shutting down all 
> executors
> 22/03/01 15:23:43 INFO 
> KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each 
> executor to shut down
> 22/03/01 15:23:43 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has 
> been closed.
> 22/03/01 15:23:44 INFO MapOutputTrackerMasterEndpoint: 
> MapOutputTrackerMasterEndpoint stopped!
> 22/03/01 15:23:44 INFO MemoryStore: MemoryStore cleared
> 22/03/01 15:23:44 INFO BlockManager: BlockManager stopped
> 22/03/01 15:23:44 INFO BlockManagerMaster: BlockManagerMaster stopped
> 22/03/01 15:23:44 INFO 
> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: 
> OutputCommitCoordinator stopped!
> 22/03/01 15:23:44 INFO SparkContext: Successfully stopped SparkContext
> Exception in thread "main" org.apache.hudi.exception.HoodieException: 
> org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit 
> time 20220301152244205
>       at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:184)
>       at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
>       at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:179)
>       at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:530)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>       at 
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
>       at 
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
>       at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
>       at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
>       at 
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
>       at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
>       at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit 
> time 20220301152244205
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>       at 
> org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
>       at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:182)
>       ... 15 more
> Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert 
> for commit time 20220301152244205
>       at 
> org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
>       at 
> org.apache.hudi.table.action.deltacommit.SparkInsertDeltaCommitActionExecutor.execute(SparkInsertDeltaCommitActionExecutor.java:47)
>       at 
> org.apache.hudi.table.HoodieSparkMergeOnReadTable.insert(HoodieSparkMergeOnReadTable.java:96)
>       at 
> org.apache.hudi.table.HoodieSparkMergeOnReadTable.insert(HoodieSparkMergeOnReadTable.java:78)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.insert(SparkRDDWriteClient.java:182)
>       at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:574)
>       at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:329)
>       at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:656)
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 20.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 20.0 (TID 460) (10.0.30.133 executor 1): 
> org.apache.hudi.exception.HoodieMetadataException: Failed to retrieve files 
> in partition 
> s3a://kwabhudi-76437a13-5c90-471b-b6fb-1d362c409e5b/kwabhudi_default/threads 
> from metadata
>       at 
> org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:134)
>       at 
> org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:65)
>       at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$9(AbstractTableFileSystemView.java:304)
>       at 
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
>       at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:295)
>       at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestFileSlicesBeforeOrOn(AbstractTableFileSystemView.java:638)
>       at 
> org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:119)
>       at 
> org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getLatestFileSlicesBeforeOrOn(PriorityBasedFileSystemView.java:182)
>       at 
> org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFileCandidates(SparkUpsertDeltaCommitPartitioner.java:107)
>       at 
> org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFiles(SparkUpsertDeltaCommitPartitioner.java:66)
>       at 
> org.apache.hudi.table.action.commit.UpsertPartitioner.lambda$getSmallFilesForPartitions$f1d92f9e$1(UpsertPartitioner.java:253)
>       at 
> org.apache.spark.api.java.JavaPairRDD$.$anonfun$pairFunToScalaFun$1(JavaPairRDD.scala:1073)
>       at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>       at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
>       at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
>       at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
>       at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
>       at scala.collection.AbstractIterator.to(Iterator.scala:1431)
>       at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
>       at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
>       at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
>       at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
>       at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>       at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
>       at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
>       at 
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>       at org.apache.spark.scheduler.Task.run(Task.scala:131)
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.hudi.exception.HoodieException: Exception when reading 
> log file 
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:335)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:180)
>       at 
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:104)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:71)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:51)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader$Builder.build(HoodieMetadataMergedLogRecordReader.java:246)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:377)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$openReadersIfNeeded$4(HoodieBackedTableMetadata.java:293)
>       at 
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.openReadersIfNeeded(HoodieBackedTableMetadata.java:283)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$0(HoodieBackedTableMetadata.java:139)
>       at java.util.HashMap.forEach(HashMap.java:1290)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:138)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKey(HoodieBackedTableMetadata.java:128)
>       at 
> org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:306)
>       at 
> org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:132)
>       ... 38 more
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:132)
>       at 
> org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:110)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.createHoodieRecord(AbstractHoodieLogRecordReader.java:394)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.createHoodieRecord(HoodieMetadataMergedLogRecordReader.java:96)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:371)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:430)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:326)
>       ... 53 more
> Driver stacktrace:
>       at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
>       at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
>       at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
>       at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>       at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
>       at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
>       at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
>       at scala.Option.foreach(Option.scala:407)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
>       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>       at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
>       at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
>       at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
>       at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
>       at 
> org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:737)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
>       at 
> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:736)
>       at 
> org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:663)
>       at 
> org.apache.hudi.table.action.commit.UpsertPartitioner.getSmallFilesForPartitions(UpsertPartitioner.java:253)
>       at 
> org.apache.hudi.table.action.commit.UpsertPartitioner.assignInserts(UpsertPartitioner.java:158)
>       at 
> org.apache.hudi.table.action.commit.UpsertPartitioner.<init>(UpsertPartitioner.java:94)
>       at 
> org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.<init>(SparkUpsertDeltaCommitPartitioner.java:50)
>       at 
> org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.getUpsertPartitioner(BaseSparkDeltaCommitActionExecutor.java:69)
>       at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getInsertPartitioner(BaseSparkCommitActionExecutor.java:405)
>       at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getPartitioner(BaseSparkCommitActionExecutor.java:227)
>       at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:171)
>       at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:84)
>       at 
> org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:57)
>       ... 11 more
> Caused by: org.apache.hudi.exception.HoodieMetadataException: Failed to 
> retrieve files in partition 
> s3a://kwabhudi-76437a13-5c90-471b-b6fb-1d362c409e5b/kwabhudi_default/threads 
> from metadata
>       at 
> org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:134)
>       at 
> org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:65)
>       at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$9(AbstractTableFileSystemView.java:304)
>       at 
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
>       at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:295)
>       at 
> org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestFileSlicesBeforeOrOn(AbstractTableFileSystemView.java:638)
>       at 
> org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:119)
>       at 
> org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getLatestFileSlicesBeforeOrOn(PriorityBasedFileSystemView.java:182)
>       at 
> org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFileCandidates(SparkUpsertDeltaCommitPartitioner.java:107)
>       at 
> org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFiles(SparkUpsertDeltaCommitPartitioner.java:66)
>       at 
> org.apache.hudi.table.action.commit.UpsertPartitioner.lambda$getSmallFilesForPartitions$f1d92f9e$1(UpsertPartitioner.java:253)
>       at 
> org.apache.spark.api.java.JavaPairRDD$.$anonfun$pairFunToScalaFun$1(JavaPairRDD.scala:1073)
>       at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>       at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
>       at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
>       at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
>       at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
>       at scala.collection.AbstractIterator.to(Iterator.scala:1431)
>       at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
>       at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
>       at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
>       at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
>       at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>       at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
>       at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
>       at 
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>       at org.apache.spark.scheduler.Task.run(Task.scala:131)
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
>       ... 3 more
> Caused by: org.apache.hudi.exception.HoodieException: Exception when reading 
> log file 
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:335)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:180)
>       at 
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:104)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:71)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:51)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader$Builder.build(HoodieMetadataMergedLogRecordReader.java:246)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:377)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$openReadersIfNeeded$4(HoodieBackedTableMetadata.java:293)
>       at 
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.openReadersIfNeeded(HoodieBackedTableMetadata.java:283)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$0(HoodieBackedTableMetadata.java:139)
>       at java.util.HashMap.forEach(HashMap.java:1290)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:138)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKey(HoodieBackedTableMetadata.java:128)
>       at 
> org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:306)
>       at 
> org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:132)
>       ... 38 more
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:132)
>       at 
> org.apache.hudi.common.util.SpillableMapUtils.convertToHoodieRecordPayload(SpillableMapUtils.java:110)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.createHoodieRecord(AbstractHoodieLogRecordReader.java:394)
>       at 
> org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.createHoodieRecord(HoodieMetadataMergedLogRecordReader.java:96)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:371)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:430)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:326)
>       ... 53 more
> 22/03/01 15:23:44 INFO ShutdownHookManager: Shutdown hook called
> 22/03/01 15:23:44 INFO ShutdownHookManager: Deleting directory 
> /var/data/spark-e901472a-1998-4833-9f0a-03e9b0fc5ba6/spark-bcba3ea6-735e-44a4-b699-6fadea0a6f37
> 22/03/01 15:23:44 INFO ShutdownHookManager: Deleting directory 
> /tmp/spark-38bda3a2-5547-43b0-8e1d-725deecfd536 {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to