soumilshah1995 opened a new issue, #8166: URL: https://github.com/apache/hudi/issues/8166
**Subject :** Question on Hudi bucket index * Bucket indexes are suitable for upsert use cases on huge datasets with a large number of file groups within partitions, relatively even data distribution across partitions, and can achieve relatively even data distribution on the bucket hash field column. It can have better upsert performance in these cases due to no index lookup involved as file groups are located based on a [hashing mechanism](https://cwiki.apache.org/confluence/display/HUDI/RFC+-+29%3A+Hash+Index#RFC29:HashIndex-Howhashindexworks), which is very fast. This is totally different from both simple and Bloom indexes, where an explicit index lookup step is involved during write. The buckets here has one-one mapping with the hudi file group and since the total number of buckets (defined by hoodie.bucket.index.num.buckets(default – 4)) is fixed here, it can potentially lead to skewed data (data distributed unevenly across buckets) and scalability (buckets can grow over time) issues over time. These issues will be a ddressed in the upcoming [consistent hashing bucket index](https://issues.apache.org/jira/browse/HUDI-3000), which is going to be a special type of bucket index. ##### Questions: * Are setting mentioned below are right way to implement Bucket Index ? ``` ,"hoodie.index.type":"BUCKET" ,"hoodie.index.bucket.engine" : 'SIMPLE' ,'hoodie.storage.layout.partitioner.class':'org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner' ,'hoodie.bucket.index.num.buckets':"4" ``` * Assuming answer would be yes should i be expecting to see 4 folder insider which basefiles should be present when selecting this option or i should simply see number at start of base files 000 0001 * * How do i specify if i want to perform the HASH on say column "country" and not on record key ? * i am attaching some sample code so i can properly understand if i want to do hashing on say country how can i specify columns * is there a way we would elaborate documentation Hudi website on Index and add more information about bucket index and other with some examples ? * is consistent hashing only for MOR tables ? ``` try: import os import sys import uuid import pyspark from pyspark.sql import SparkSession from pyspark import SparkConf, SparkContext from pyspark.sql.functions import col, asc, desc from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when from pyspark.sql.functions import * from pyspark.sql.types import * from datetime import datetime from functools import reduce from faker import Faker except Exception as e: pass SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell" os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS os.environ['PYSPARK_PYTHON'] = sys.executable os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable spark = SparkSession.builder \ .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ .config('className', 'org.apache.hudi') \ .config('spark.sql.hive.convertMetastoreParquet', 'false') \ .getOrCreate() db_name = "hudidb" table_name = "hudi_bucket_table" recordkey = 'uuid' path = f"file:///C:/tmp/{db_name}/{table_name}" precombine = "date" method = 'upsert' table_type = "COPY_ON_WRITE" # COPY_ON_WRITE | MERGE_ON_READ PARTITION_FIELD = "country" hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.recordkey.field': recordkey, 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': method, 'hoodie.datasource.write.precombine.field': precombine ,"hoodie.upsert.shuffle.parallelism":100 ,"hoodie.index.type":"BUCKET" ,"hoodie.index.bucket.engine" : 'SIMPLE' ,'hoodie.storage.layout.partitioner.class':'org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner' ,'hoodie.bucket.index.num.buckets':"4" ,"hoodie.clean.automatic": "true" , "hoodie.clean.async": "true" , "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS' , "hoodie.cleaner.fileversions.retained": "3" , "hoodie-conf hoodie.cleaner.parallelism": '200' , 'hoodie.cleaner.commits.retained': 5 } spark_df = spark.createDataFrame( data=[ (1, "insert 1", "2020-01-06 12:12:12", "IN"), (2, "insert 2", "2020-01-06 12:12:13", "US"), (3, "insert 3", "2020-01-06 12:12:15", "IN"), (4, "insert 4", "2020-01-06 12:13:15", "US"), ], schema=["uuid", "message", "date", "country"]) spark_df.show() spark_df.write.format("hudi"). \ options(**hudi_options). \ mode("append"). \ save(path) spark_df = spark.createDataFrame( data=[ (1, "update 1", "2020-01-06 12:12:12", "IN"), (2, "update 2", "2020-01-06 12:12:13", "US"), (5, "insert 5", "2020-01-06 12:13:15", "US"), ], schema=["uuid", "message", "date", "country"]) spark_df.show() spark_df.write.format("hudi"). \ options(**hudi_options). \ mode("append"). \ save(path) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
