neerajpadarthi opened a new issue, #9050:
URL: https://github.com/apache/hudi/issues/9050
Hello team,
I am using EMR 6.7/Hudi Version 0.11.0. During ingestion, I enabled the
metadata bloom filter stats(**hoodie.metadata.index.bloom.filter.enable**), and
to refer these stats in Upserts, I have passed **bloom.index.use.metadata**
config but ended up with a Hudi exception.
*Hudi Configs*
{'hoodie.datasource.hive_sync.database': 'tax', 'hoodie.table.name':
'line_items', 'hoodie.datasource.write.recordkey.field':
'line_item_id,transaction_id', 'hoodie.datasource.write.precombine.field':
'line_item_id', 'hoodie.datasource.hive_sync.table': 'line_items',
'hoodie.datasource.hive_sync.support_timestamp': 'true',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.hive_sync.mode': 'hms', 'hoodie.metadata.enable': 'true',
'hoodie.metadata.index.column.stats.enable': 'true',
'hoodie.metadata.index.bloom.filter.enable': 'true',
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.NonPartitionedExtractor',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.NonpartitionedKeyGenerator',
'hoodie.parquet.max.file.size': 125829120, 'hoodie.parquet.small.file.limit':
104857600, 'hoodie.index.type': 'BLOOM', 'hoodie.bloom.index.use.metadata':
'true', 'hoodie.datasource.write.o
peration': 'upsert', 'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
'hoodie.upsert.shuffle.parallelism': 750, 'hoodie.cleaner.commits.retained':
168, 'hoodie.keep.min.commits': 173, 'hoodie.keep.max.commits': 174}
*1st Run*
No error with 1st commit. Below is the bloom filters partition from the MDT
folder.

*Second Run*
The second commit failed when the index lookup tried reading the bloom
filter stats from the MDT table, error trace -
```
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2610)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2559)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2558)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2558)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1200)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1200)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1200)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2798)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2740)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2729)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:978)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2215)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2255)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
at
org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:366)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at
org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:366)
at
org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
at
org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:104)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:187)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:158)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:85)
at
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:57)
... 54 more
Caused by: java.lang.RuntimeException:
org.apache.hudi.exception.HoodieIndexException: Failed to get the bloom filter
for
(,c6274ff2-c1db-4ca1-ba79-a98b565617e6-0_0-60-7661_20230624195945538.parquet)
at
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
at
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:183)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:133)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.hudi.exception.HoodieIndexException: Failed to get the
bloom filter for
(,c6274ff2-c1db-4ca1-ba79-a98b565617e6-0_0-60-7661_20230624195945538.parquet)
at
org.apache.hudi.index.bloom.HoodieMetadataBloomIndexCheckFunction$BloomIndexLazyKeyCheckIterator.lambda$computeNext$2(HoodieMetadataBloomIndexCheckFunction.java:124)
at java.util.HashMap.forEach(HashMap.java:1290)
at
org.apache.hudi.index.bloom.HoodieMetadataBloomIndexCheckFunction$BloomIndexLazyKeyCheckIterator.computeNext(HoodieMetadataBloomIndexCheckFunction.java:117)
at
org.apache.hudi.index.bloom.HoodieMetadataBloomIndexCheckFunction$BloomIndexLazyKeyCheckIterator.computeNext(HoodieMetadataBloomIndexCheckFunction.java:74)
at
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
... 16 more
```
*Spark UI*

(Observation - Though, I have enabled MDT bloom
filter(**bloom.index.use.metadata**) but still I see 'Obtain key ranges for
file slices (range pruning=on)'. I am guessing this is same as the issue
mentioned in this ticket - https://github.com/apache/hudi/issues/7059 as PK is
a CompositeKey of 2 columns. Can anyone confirm on this issue? Is this issue
fixed in latest releases? )
Failed Stage -

Can anyone please help me on this?. Please let me know if you need any
additional details. Thanks in Advance.
Note - It worked when disabling this
config(hoodie.bloom.index.use.metadata).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]