bhavya-ganatra opened a new issue, #18161:
URL: https://github.com/apache/hudi/issues/18161
### Describe the problem you faced
I am planning to use consistent hashing bucket index for hudi table. I tried
running it on my local with inline clustering enabled and seeing some
unexpected behaviour, so I wanted some clarity around few questions:
1. How to validate that no. of buckets are increased after clustering?
I am checking .hashing_meta file, it looks like below:
```
{
"version" : 0,
"partitionPath" : "etl_source=dim",
"instant" : "20260210110502321",
"numBuckets" : 2,
"seqNo" : 25,
"nodes" : [ {
"value" : 67108863,
"fileIdPrefix" : "ba8fcebe-5bc9-42ac-a53d-6130d7afd964",
"tag" : "NORMAL"
},{}, {}, {}, ...]
}
S3 path structure for this file:
<hoodieTablePath>/.hoodie/.bucket_index/consistent_hashing_metadata/<partition-value>/<prefix>.hashing_meta
```
In above file, after clustering is successful, I am seeing new parquet files
are created, and new `node` objects are being added, but `numBuckets` is not
changing. So, is this expected? If not, then how to validate that no. buckets
have been increased?
2. Is consistent hashing engine not supported for non-partitioned data? i.e.
`hoodie.datasource.write.keygenerator.type: NON_PARTITION`
While running clustering, I was seeing below exception for non-partitioned
table:
```
[task-result-getter-2] WARN org.apache.spark.scheduler.TaskSetManager - Lost
task 1.0 in stage 79.0 (TID 1989) (192.171.189.50 executor 10):
java.lang.IllegalArgumentException: Partition should not be null or empty
at
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:42)
at
org.apache.hudi.client.clustering.run.strategy.SingleSparkJobConsistentHashingExecutionStrategy.performBucketMergeForGroup(SingleSparkJobConsistentHashingExecutionStrategy.java:117)
at
org.apache.hudi.client.clustering.run.strategy.SingleSparkJobConsistentHashingExecutionStrategy.performClusteringForGroup(SingleSparkJobConsistentHashingExecutionStrategy.java:92)
at
org.apache.hudi.client.clustering.run.strategy.SingleSparkJobExecutionStrategy.lambda$performClustering$9a22fd35$1(SingleSparkJobExecutionStrategy.java:62)
at
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:234)
at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:363)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1630)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1540)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1604)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1405)
at
org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1359)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:395)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:61)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:378)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:333)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.doWrite(ShuffleWriteProcessor.scala:45)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:69)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:174)
at org.apache.spark.scheduler.Task.run(Task.scala:152)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:632)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:635)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
```
3. Can we not use our own CustomRecordMerger class (i.e. extends
HoodieSparkRecordMerger) in inline clustering? I was seeing error in inline
clustering:
```
dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl
- Stage 56 was cancelled
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler -
ShuffleMapStage 56 (start at HudiWriter.java:172) failed in 0.294 s due to Job
aborted due to stage failure: Task 3 in stage 56.0 failed 4 times, most recent
failure: Lost task 3.3 in stage 56.0 (TID 1515) (192.171.178.87 executor 16):
org.apache.hudi.exception.HoodieException: java.lang.IllegalArgumentException:
No valid spark merger implementation set for
`hoodie.write.record.merge.custom.implementation.classes`
at
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:75)
at
org.apache.hudi.client.clustering.run.strategy.SingleSparkJobConsistentHashingExecutionStrategy.performBucketMergeForGroup(SingleSparkJobConsistentHashingExecutionStrategy.java:131)
at
org.apache.hudi.client.clustering.run.strategy.SingleSparkJobConsistentHashingExecutionStrategy.performClusteringForGroup(SingleSparkJobConsistentHashingExecutionStrategy.java:92)
at
org.apache.hudi.client.clustering.run.strategy.SingleSparkJobExecutionStrategy.lambda$performClustering$9a22fd35$1(SingleSparkJobExecutionStrategy.java:62)
at
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:234)
```
I am using this same class for reader and (inline) compaction, it is working
fine. I will have to validate once if this class works for async clustering, if
yes, it is not a major concern for me.
4. I have read that metadata should be disabled for consistent hashing, is
it true? I don't see this requirement on official doc, so wanted to confirm
once.
Thank you for any guidance or clarification you may be able to share!
### To Reproduce
I am using below hudi options:
```
{hoodie.clustering.plan.strategy.target.file.max.bytes=10485760,
hoodie.metrics.pushgateway.report.labels=cluster:perf-scale,namespace:bhavya,
hoodie.index.type=BUCKET, hoodie.clean.automatic=false,
hoodie.compact.inline=false, hoodie.datasource.write.recordkey.field=cid,
hoodie.metadata.enable=false, hoodie.datasource.write.table.type=MERGE_ON_READ,
hoodie.datasource.write.keygenerator.type=SIMPLE,
hoodie.parquet.small.file.limit=5242880, hoodie.index.bucket.hash.field=cid,
hoodie.cleaner.commits.retained=2, hoodie.clustering.inline.max.commits=3,
hoodie.memory.spillable.map.path=/tmp/, hoodie.table.cdc.enabled=true,
hoodie.delete.shuffle.parallelism=16, hoodie.bucket.index.split.threshold=1.5,
hoodie.table.name=content, hoodie.table.log.file.format=parquet,
hoodie.upsert.shuffle.parallelism=16,
hoodie.clustering.plan.strategy.small.file.limit=6291456,
hoodie.datasource.write.precombine.field=processing_timestamp,
hoodie.insert.shuffle.parallelism=16, hoodie.cleaner.policy.failed
.writes=LAZY, hoodie.datasource.compaction.async.enable=false,
hoodie.datasource.write.operation=upsert,
hoodie.logfile.data.block.format=parquet,
hoodie.index.bucket.engine=CONSISTENT_HASHING,
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy,
hoodie.clustering.plan.strategy.class=org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy,
hoodie.metadata.index.column.stats.enable=false,
hoodie.clustering.updates.strategy=org.apache.hudi.client.clustering.update.strategy.SparkConsistentBucketDuplicateUpdateStrategy,
hoodie.metrics.pushgateway.host=pushgateway-prometheus-pushgateway.monitoring-remote.svc.cluster.local,
hoodie.parquet.max.file.size=10485760, hoodie.bucket.index.min.num.buckets=2,
hoodie.datasource.write.hive_style_partitioning=true,
hoodie.bucket.index.merge.threshold=0.2,
hoodie.bucket.index.max.num.buckets=512, hoodie.bucket.index.bucket.split.t
hreshold=1.0, hoodie.datasource.write.row.writer.enable=false,
hoodie.bulkinsert.shuffle.parallelism=16,
hoodie.cleaner.policy=KEEP_LATEST_COMMITS,
hoodie.compact.inline.trigger.strategy=NUM_COMMITS,
hoodie.metrics.pushgateway.port=9091,
hoodie.metrics.pushgateway.job.name=lhwriter, hoodie.clustering.inline=true,
hoodie.metadata.compact.max.delta.commits=8,
hoodie.table.cdc.supplemental.logging=data_before_after,
hoodie.memory.merge.fraction=0.2,
hoodie.datasource.write.partitionpath.field=etl_source,
hoodie.bucket.index.num.buckets=2, hoodie.clustering.inline.max.commit=3,
hoodie.metrics.pushgateway.delete.on.shutdown=false,
hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY, hoodie.metrics.on=true,
hoodie.metrics.pushgateway.random.job.name.suffix=false}
```
### Expected behavior
Expectations as per my understanding:
1. Not sure.
2. Simple(engineType) bucket index works fine, so consistent hashing
engineType should be also working for NON_PARTITIONED table.
3. CustomRecordMerger should be working.
4. Not sure
### Environment Description
* Hudi version: 1.1.0
* Spark version: 3.5.6
* Flink version: NA
* Hive version: NA
* Hadoop version: 3.4.1
* Storage (HDFS/S3/GCS..): S3
* Running on Docker? (yes/no): yes. Using EMR 7.12.0 image as a base, but
have overridden hudi-1.1.0 version.
### Additional context
_No response_
### Stacktrace
```shell
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]