[
https://issues.apache.org/jira/browse/HUDI-5597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan updated HUDI-5597:
--------------------------------------
Fix Version/s: 0.13.0
> Deltastreamer ingestion fails when consistent hashing index is used
> -------------------------------------------------------------------
>
> Key: HUDI-5597
> URL: https://issues.apache.org/jira/browse/HUDI-5597
> Project: Apache Hudi
> Issue Type: Bug
> Components: writer-core
> Affects Versions: 0.13.0
> Reporter: sivabalan narayanan
> Priority: Critical
> Fix For: 0.13.0
>
>
> I tested consistent hashing index w/ a deltastreamer pipeline. but it failed
> w/ below exception. Same pipeline works w/o any issues for default index.
>
> Additional configs I used
> hoodie.index.type=BUCKET
> hoodie.index.bucket.engine=CONSISTENT_HASHING
> hoodie.bucket.index.num.buckets=4
> hoodie.compact.inline.max.delta.commits=2
>
> I have some parquet data in a dir. I am starting a deltastreamer w/
> PArquetDFS source for mor table. setting the additional configs as shown
> above.
> I did make some minor fixes to my branch (compared to master), but thats only
> to enable inline compaction w/ deltastreamer continuous mode. In general,
> only async compaction is allowed w/ detlastreamer continuous. I just wanted
> to test inline for now. but apart from that, I am using latest master to
> test.
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 1 in stage 100.0 failed 1 times, most recent failure: Lost task 1.0 in
> stage 100.0 (TID 176, localhost, executor driver):
> org.apache.hudi.exception.HoodieException: Unsupported Operation Exception
> at
> org.apache.hudi.common.util.collection.BitCaskDiskMap.values(BitCaskDiskMap.java:303)
> at
> org.apache.hudi.common.util.collection.ExternalSpillableMap.values(ExternalSpillableMap.java:275)
> at java.util.Collections$UnmodifiableMap.values(Collections.java:1487)
> at
> org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:397)
> at
> org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:409)
> at
> org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:168)
> at
> org.apache.hudi.table.HoodieSparkCopyOnWriteTable.handleUpdateInternal(HoodieSparkCopyOnWriteTable.java:224)
> at
> org.apache.hudi.table.HoodieSparkCopyOnWriteTable.handleUpdate(HoodieSparkCopyOnWriteTable.java:215)
> at
> org.apache.hudi.table.action.compact.CompactionExecutionHelper.writeFileAndGetWriteStats(CompactionExecutionHelper.java:64)
> at
> org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:231)
> at
> org.apache.hudi.table.action.compact.HoodieCompactor.lambda$compact$9cd4b1be$1(HoodieCompactor.java:129)
> at
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
> at
> org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
> at
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
> at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:123)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
> at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
> at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
> at
> org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361)
> at
> org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
> at
> org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:155)
> at
> org.apache.hudi.table.action.compact.RunCompactionActionExecutor.execute(RunCompactionActionExecutor.java:101)
> ... 19 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)