bhavya-ganatra opened a new issue, #18161:
URL: https://github.com/apache/hudi/issues/18161

   ### Describe the problem you faced
   
   I am planning to use consistent hashing bucket index for hudi table. I tried 
running it on my local with inline clustering enabled and seeing some 
unexpected behaviour, so I wanted some clarity around few questions:
   
   1. How to validate that no. of buckets are increased after clustering?
   I am checking .hashing_meta file, it looks like below:
   ```
   {
     "version" : 0,
     "partitionPath" : "etl_source=dim",
     "instant" : "20260210110502321",
     "numBuckets" : 2,
     "seqNo" : 25,
     "nodes" : [ {
       "value" : 67108863,
       "fileIdPrefix" : "ba8fcebe-5bc9-42ac-a53d-6130d7afd964",
       "tag" : "NORMAL"
     },{}, {}, {}, ...]
   }
   S3 path structure for this file: 
<hoodieTablePath>/.hoodie/.bucket_index/consistent_hashing_metadata/<partition-value>/<prefix>.hashing_meta
   ```
   In above file, after clustering is successful, I am seeing new parquet files 
are created, and new `node` objects are being added, but `numBuckets` is not 
changing. So, is this expected? If not, then how to validate that no. buckets 
have been increased?
   
   2. Is consistent hashing engine not supported for non-partitioned data? i.e. 
`hoodie.datasource.write.keygenerator.type: NON_PARTITION`
   While running clustering, I was seeing below exception for non-partitioned 
table:
   
   ```
   [task-result-getter-2] WARN org.apache.spark.scheduler.TaskSetManager - Lost 
task 1.0 in stage 79.0 (TID 1989) (192.171.189.50 executor 10): 
java.lang.IllegalArgumentException: Partition should not be null or empty
       at 
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:42)
       at 
org.apache.hudi.client.clustering.run.strategy.SingleSparkJobConsistentHashingExecutionStrategy.performBucketMergeForGroup(SingleSparkJobConsistentHashingExecutionStrategy.java:117)
       at 
org.apache.hudi.client.clustering.run.strategy.SingleSparkJobConsistentHashingExecutionStrategy.performClusteringForGroup(SingleSparkJobConsistentHashingExecutionStrategy.java:92)
       at 
org.apache.hudi.client.clustering.run.strategy.SingleSparkJobExecutionStrategy.lambda$performClustering$9a22fd35$1(SingleSparkJobExecutionStrategy.java:62)
       at 
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
       at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
       at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
       at 
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:234)
       at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:363)
       at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1630)
       at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1540)
       at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1604)
       at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1405)
       at 
org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1359)
       at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:395)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
       at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:61)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:378)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:333)
       at 
org.apache.spark.shuffle.ShuffleWriteProcessor.doWrite(ShuffleWriteProcessor.scala:45)
       at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:69)
       at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
       at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
       at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:174)
       at org.apache.spark.scheduler.Task.run(Task.scala:152)
       at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:632)
       at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
       at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:635)
       at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
       at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
       at java.base/java.lang.Thread.run(Thread.java:840)
   ```
   
   3. Can we not use our own CustomRecordMerger class (i.e. extends 
HoodieSparkRecordMerger) in inline clustering? I was seeing error in inline 
clustering:
   ```
   dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl 
- Stage 56 was cancelled
   [dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - 
ShuffleMapStage 56 (start at HudiWriter.java:172) failed in 0.294 s due to Job 
aborted due to stage failure: Task 3 in stage 56.0 failed 4 times, most recent 
failure: Lost task 3.3 in stage 56.0 (TID 1515) (192.171.178.87 executor 16): 
org.apache.hudi.exception.HoodieException: java.lang.IllegalArgumentException: 
No valid spark merger implementation set for 
`hoodie.write.record.merge.custom.implementation.classes`
       at 
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:75)
       at 
org.apache.hudi.client.clustering.run.strategy.SingleSparkJobConsistentHashingExecutionStrategy.performBucketMergeForGroup(SingleSparkJobConsistentHashingExecutionStrategy.java:131)
       at 
org.apache.hudi.client.clustering.run.strategy.SingleSparkJobConsistentHashingExecutionStrategy.performClusteringForGroup(SingleSparkJobConsistentHashingExecutionStrategy.java:92)
       at 
org.apache.hudi.client.clustering.run.strategy.SingleSparkJobExecutionStrategy.lambda$performClustering$9a22fd35$1(SingleSparkJobExecutionStrategy.java:62)
       at 
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
       at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
       at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
       at 
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:234)
   ```
   I am using this same class for reader and (inline) compaction, it is working 
fine. I will have to validate once if this class works for async clustering, if 
yes, it is not a major concern for me.
   
   4. I have read that metadata should be disabled for consistent hashing, is 
it true? I don't see this requirement on official doc, so wanted to confirm 
once.
   
   Thank you for any guidance or clarification you may be able to share!
   
   ### To Reproduce
   
   I am using below hudi options:
   
   ```
   {hoodie.clustering.plan.strategy.target.file.max.bytes=10485760, 
hoodie.metrics.pushgateway.report.labels=cluster:perf-scale,namespace:bhavya, 
hoodie.index.type=BUCKET, hoodie.clean.automatic=false, 
hoodie.compact.inline=false, hoodie.datasource.write.recordkey.field=cid, 
hoodie.metadata.enable=false, hoodie.datasource.write.table.type=MERGE_ON_READ, 
hoodie.datasource.write.keygenerator.type=SIMPLE, 
hoodie.parquet.small.file.limit=5242880, hoodie.index.bucket.hash.field=cid, 
hoodie.cleaner.commits.retained=2, hoodie.clustering.inline.max.commits=3, 
hoodie.memory.spillable.map.path=/tmp/, hoodie.table.cdc.enabled=true, 
hoodie.delete.shuffle.parallelism=16, hoodie.bucket.index.split.threshold=1.5, 
hoodie.table.name=content, hoodie.table.log.file.format=parquet, 
hoodie.upsert.shuffle.parallelism=16, 
hoodie.clustering.plan.strategy.small.file.limit=6291456, 
hoodie.datasource.write.precombine.field=processing_timestamp, 
hoodie.insert.shuffle.parallelism=16, hoodie.cleaner.policy.failed
 .writes=LAZY, hoodie.datasource.compaction.async.enable=false, 
hoodie.datasource.write.operation=upsert, 
hoodie.logfile.data.block.format=parquet, 
hoodie.index.bucket.engine=CONSISTENT_HASHING, 
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy,
 
hoodie.clustering.plan.strategy.class=org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy,
 hoodie.metadata.index.column.stats.enable=false, 
hoodie.clustering.updates.strategy=org.apache.hudi.client.clustering.update.strategy.SparkConsistentBucketDuplicateUpdateStrategy,
 
hoodie.metrics.pushgateway.host=pushgateway-prometheus-pushgateway.monitoring-remote.svc.cluster.local,
 hoodie.parquet.max.file.size=10485760, hoodie.bucket.index.min.num.buckets=2, 
hoodie.datasource.write.hive_style_partitioning=true, 
hoodie.bucket.index.merge.threshold=0.2, 
hoodie.bucket.index.max.num.buckets=512, hoodie.bucket.index.bucket.split.t
 hreshold=1.0, hoodie.datasource.write.row.writer.enable=false, 
hoodie.bulkinsert.shuffle.parallelism=16, 
hoodie.cleaner.policy=KEEP_LATEST_COMMITS, 
hoodie.compact.inline.trigger.strategy=NUM_COMMITS, 
hoodie.metrics.pushgateway.port=9091, 
hoodie.metrics.pushgateway.job.name=lhwriter, hoodie.clustering.inline=true, 
hoodie.metadata.compact.max.delta.commits=8, 
hoodie.table.cdc.supplemental.logging=data_before_after, 
hoodie.memory.merge.fraction=0.2, 
hoodie.datasource.write.partitionpath.field=etl_source, 
hoodie.bucket.index.num.buckets=2, hoodie.clustering.inline.max.commit=3, 
hoodie.metrics.pushgateway.delete.on.shutdown=false, 
hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY, hoodie.metrics.on=true, 
hoodie.metrics.pushgateway.random.job.name.suffix=false}
   ```
   
   
   
   ### Expected behavior
   
   Expectations as per my understanding:
    
   1. Not sure. 
   2. Simple(engineType) bucket index works fine, so consistent hashing 
engineType should be also working for NON_PARTITIONED table.
   3. CustomRecordMerger should be working.
   4. Not sure
   
   ### Environment Description
   
   * Hudi version: 1.1.0
   * Spark version: 3.5.6
   * Flink version: NA
   * Hive version: NA
   * Hadoop version: 3.4.1
   * Storage (HDFS/S3/GCS..): S3
   * Running on Docker? (yes/no): yes. Using EMR 7.12.0 image as a base, but 
have overridden hudi-1.1.0 version.
   
   
   ### Additional context
   
   _No response_
   
   ### Stacktrace
   
   ```shell
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to