ziudu opened a new issue, #11565:
URL: https://github.com/apache/hudi/issues/11565

   **Describe the problem you faced**
   
   When I read a table (e.g. tb_transaction_detail) and write to another hudi 
table (e.g. tb_transaction_detail_bucket_index) with bulk insert and bucket 
index enabled, I noticed data skew during the stage "save at 
DatasetBulkInsertCommitActionExecutor.java:81" 。
   
----------------------------------------------------------------------------------
   tb_transaction_detail has 160 partitions, with 5 files in each partition. 
Data is evenly distributed, so each file is about 16.7 MB in size. Total size 
is about 13GB。
   
   I read table tb_transaction_detail, and write to another hudi table 
tb_transaction_detail_bucket_index with bulk_insert and bucket index, I noticed:
   
   1. the stage "save at DatasetBulkInsertCommitActionExecutor.java:81" has 800 
tasks. It is normal as input table has 800 files (deduced parallelism = 800) 
   
![pic1](https://github.com/apache/hudi/assets/87431810/8739a6e3-9a16-42ac-af66-d46db6ff75a5)
   
   2. Among those 800 tasks, only 224 tasks have data to process, while 576 
tasks have nothing to do.
   
![pic3](https://github.com/apache/hudi/assets/87431810/254a96cc-de76-4d4c-a8e8-18878079813c)
   Note: some tasks have only 20MB of data to process
   
   3. If sorting the tasks by "duration", I could see some tasks have 10 times 
more data to process (200MB+):
   
![pic4](https://github.com/apache/hudi/assets/87431810/cee0e710-0ab2-472e-8ec7-00710d95f2c6)
   
   **However, data in the resulting table "tb_transaction_detail_bucket_index" 
is evenly distributed,** with 160 partitions, each partition has 5 files, each 
file is about 16.67MB.
   
   Is it normal to have skewed data during the write stage when bucket index is 
enabled? I'm expecting all 320 tasks have some data to process. Also some tasks 
with largest "shuffle read" bytes could have spill.   
   
   
----------------------------------------------------------------------------------
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Create a table, with for example, 160 partitions,  each partition has 5 
base files
   2. Read the table and write to another table, with bulk_insert, bucket index 
enabled.
   `# -*- coding:utf8 -*-
   from pyspark.sql import SparkSession
   
   if __name__ == '__main__':
       spark = SparkSession.builder \
           .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer") \
           .config("spark.sql.optimizer.dynamicPartitionPruning.enabled", 
"true") \
           .config("spark.debug.maxToStringFields", "100") \
           .enableHiveSupport().getOrCreate()
       spark.sparkContext.setLogLevel(logLevel="INFO")
   
       path = 
"/ilw/test/16889087066964192324/hadoop/ods/research/transaction/mysql_10_43_10_140_3306_db_hudi_research"
 \
              "/tb_transaction_detail/hoodiedata"
       df = spark.read.format("org.apache.hudi").load(path)
       table_name = "tb_transaction_detail_bucket_index"
       hudi_options = {
           "hoodie.table.name": table_name,
           "hoodie.datasource.write.table.name": table_name,
           "hoodie.datasource.write.table.type": "MERGE_ON_READ",
           "hoodie.datasource.write.recordkey.field": "id,trans_code",  # 
"id,trans_code",
           "hoodie.datasource.write.precombine.field": "ts",
           "hoodie.datasource.write.operation": "bulk_insert",  # "bulk_insert",
           "hoodie.datasource.write.hive_style_partitioning": "true",
           "hoodie.metadata.enable": 'true',
           "hoodie.metadata.index.bloom.filter.enable": 'false',
           "hoodie.metadata.index.column.stats.enable": 'false',
           "hoodie.metadata.record.index.enable": 'false',
           "hoodie.copyonwrite.record.size.estimate": 150,
           "hoodie.index.type": "BUCKET",
           "hoodie.bucket.index.num.buckets": "5",
           "hoodie.bucket.index.hash.field": "trans_code",
           'hoodie.datasource.write.partitionpath.field': 'trans_partition',
       }
       df.write.format("hudi") \
           .options(**hudi_options) \
           .mode("append") \
           
.save("/ilw/test/16889087066964192324/hadoop/ods/research/transaction/"
                 f"hudi_research/{table_name}")`
   
   Environment:
   spark.sql.shuffle.partitions = 320
   
   **Expected behavior**
   
   All tasks during the stage "save at 
DatasetBulkInsertCommitActionExecutor.java:81" should have some data to process 
. 
   
   **Environment Description**
   * Hudi version :
   * Spark version :
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to