sivabalan narayanan created HUDI-5597:
-----------------------------------------
Summary: Deltastreamer ingestion fails when consistent hashing
index is used
Key: HUDI-5597
URL: https://issues.apache.org/jira/browse/HUDI-5597
Project: Apache Hudi
Issue Type: Bug
Components: writer-core
Affects Versions: 0.13.0
Reporter: sivabalan narayanan
I tested consistent hashing index w/ a deltastreamer pipeline. but it failed w/
below exception. Same pipeline works w/o any issues for default index.
Additional configs I used
hoodie.index.type=BUCKET
hoodie.index.bucket.engine=CONSISTENT_HASHING
hoodie.bucket.index.num.buckets=4
hoodie.compact.inline.max.delta.commits=2
I have some parquet data in a dir. I am starting a deltastreamer w/ PArquetDFS
source for mor table. setting the additional configs as shown above.
I did make some minor fixes to my branch (compared to master), but thats only
to enable inline compaction w/ deltastreamer continuous mode. In general, only
async compaction is allowed w/ detlastreamer continuous. I just wanted to test
inline for now. but apart from that, I am using latest master to test.
{code:java}
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 1 in stage 100.0 failed 1 times, most recent failure: Lost task 1.0 in
stage 100.0 (TID 176, localhost, executor driver):
org.apache.hudi.exception.HoodieException: Unsupported Operation Exception
at
org.apache.hudi.common.util.collection.BitCaskDiskMap.values(BitCaskDiskMap.java:303)
at
org.apache.hudi.common.util.collection.ExternalSpillableMap.values(ExternalSpillableMap.java:275)
at java.util.Collections$UnmodifiableMap.values(Collections.java:1487)
at
org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:397)
at
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:409)
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:168)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.handleUpdateInternal(HoodieSparkCopyOnWriteTable.java:224)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.handleUpdate(HoodieSparkCopyOnWriteTable.java:215)
at
org.apache.hudi.table.action.compact.CompactionExecutionHelper.writeFileAndGetWriteStats(CompactionExecutionHelper.java:64)
at
org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:231)
at
org.apache.hudi.table.action.compact.HoodieCompactor.lambda$compact$9cd4b1be$1(HoodieCompactor.java:129)
at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
at
org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361)
at
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
at
org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:155)
at
org.apache.hudi.table.action.compact.RunCompactionActionExecutor.execute(RunCompactionActionExecutor.java:101)
... 19 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)