sivabalan narayanan created HUDI-5597:
-----------------------------------------

             Summary: Deltastreamer ingestion fails when consistent hashing 
index is used
                 Key: HUDI-5597
                 URL: https://issues.apache.org/jira/browse/HUDI-5597
             Project: Apache Hudi
          Issue Type: Bug
          Components: writer-core
    Affects Versions: 0.13.0
            Reporter: sivabalan narayanan


I tested consistent hashing index w/ a deltastreamer pipeline. but it failed w/ 
below exception. Same pipeline works w/o any issues for default index. 

 

Additional configs I used 
hoodie.index.type=BUCKET
hoodie.index.bucket.engine=CONSISTENT_HASHING
hoodie.bucket.index.num.buckets=4
hoodie.compact.inline.max.delta.commits=2
 
I have some parquet data in a dir. I am starting a deltastreamer w/ PArquetDFS 
source for mor table. setting the additional configs as shown above.
I did make some minor fixes to my branch (compared to master), but thats only 
to enable inline compaction w/ deltastreamer continuous mode. In general, only 
async compaction is allowed w/ detlastreamer continuous. I just wanted to test 
inline for now. but apart from that, I am using latest master to test. 


{code:java}
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 1 in stage 100.0 failed 1 times, most recent failure: Lost task 1.0 in 
stage 100.0 (TID 176, localhost, executor driver): 
org.apache.hudi.exception.HoodieException: Unsupported Operation Exception
        at 
org.apache.hudi.common.util.collection.BitCaskDiskMap.values(BitCaskDiskMap.java:303)
        at 
org.apache.hudi.common.util.collection.ExternalSpillableMap.values(ExternalSpillableMap.java:275)
        at java.util.Collections$UnmodifiableMap.values(Collections.java:1487)
        at 
org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:397)
        at 
org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:409)
        at 
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:168)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.handleUpdateInternal(HoodieSparkCopyOnWriteTable.java:224)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.handleUpdate(HoodieSparkCopyOnWriteTable.java:215)
        at 
org.apache.hudi.table.action.compact.CompactionExecutionHelper.writeFileAndGetWriteStats(CompactionExecutionHelper.java:64)
        at 
org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:231)
        at 
org.apache.hudi.table.action.compact.HoodieCompactor.lambda$compact$9cd4b1be$1(HoodieCompactor.java:129)
        at 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at 
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
        at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
        at scala.Option.foreach(Option.scala:257)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
        at 
org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361)
        at 
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
        at 
org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:155)
        at 
org.apache.hudi.table.action.compact.RunCompactionActionExecutor.execute(RunCompactionActionExecutor.java:101)
        ... 19 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to