ziudu opened a new issue, #11565: URL: https://github.com/apache/hudi/issues/11565
**Describe the problem you faced** When I read a table (e.g. tb_transaction_detail) and write to another hudi table (e.g. tb_transaction_detail_bucket_index) with bulk insert and bucket index enabled, I noticed data skew during the stage "save at DatasetBulkInsertCommitActionExecutor.java:81" 。 ---------------------------------------------------------------------------------- tb_transaction_detail has 160 partitions, with 5 files in each partition. Data is evenly distributed, so each file is about 16.7 MB in size. Total size is about 13GB。 I read table tb_transaction_detail, and write to another hudi table tb_transaction_detail_bucket_index with bulk_insert and bucket index, I noticed: 1. the stage "save at DatasetBulkInsertCommitActionExecutor.java:81" has 800 tasks. It is normal as input table has 800 files (deduced parallelism = 800)  2. Among those 800 tasks, only 224 tasks have data to process, while 576 tasks have nothing to do.  Note: some tasks have only 20MB of data to process 3. If sorting the tasks by "duration", I could see some tasks have 10 times more data to process (200MB+):  **However, data in the resulting table "tb_transaction_detail_bucket_index" is evenly distributed,** with 160 partitions, each partition has 5 files, each file is about 16.67MB. Is it normal to have skewed data during the write stage when bucket index is enabled? I'm expecting all 320 tasks have some data to process. Also some tasks with largest "shuffle read" bytes could have spill. ---------------------------------------------------------------------------------- **To Reproduce** Steps to reproduce the behavior: 1. Create a table, with for example, 160 partitions, each partition has 5 base files 2. Read the table and write to another table, with bulk_insert, bucket index enabled. `# -*- coding:utf8 -*- from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession.builder \ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ .config("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true") \ .config("spark.debug.maxToStringFields", "100") \ .enableHiveSupport().getOrCreate() spark.sparkContext.setLogLevel(logLevel="INFO") path = "/ilw/test/16889087066964192324/hadoop/ods/research/transaction/mysql_10_43_10_140_3306_db_hudi_research" \ "/tb_transaction_detail/hoodiedata" df = spark.read.format("org.apache.hudi").load(path) table_name = "tb_transaction_detail_bucket_index" hudi_options = { "hoodie.table.name": table_name, "hoodie.datasource.write.table.name": table_name, "hoodie.datasource.write.table.type": "MERGE_ON_READ", "hoodie.datasource.write.recordkey.field": "id,trans_code", # "id,trans_code", "hoodie.datasource.write.precombine.field": "ts", "hoodie.datasource.write.operation": "bulk_insert", # "bulk_insert", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.metadata.enable": 'true', "hoodie.metadata.index.bloom.filter.enable": 'false', "hoodie.metadata.index.column.stats.enable": 'false', "hoodie.metadata.record.index.enable": 'false', "hoodie.copyonwrite.record.size.estimate": 150, "hoodie.index.type": "BUCKET", "hoodie.bucket.index.num.buckets": "5", "hoodie.bucket.index.hash.field": "trans_code", 'hoodie.datasource.write.partitionpath.field': 'trans_partition', } df.write.format("hudi") \ .options(**hudi_options) \ .mode("append") \ .save("/ilw/test/16889087066964192324/hadoop/ods/research/transaction/" f"hudi_research/{table_name}")` Environment: spark.sql.shuffle.partitions = 320 **Expected behavior** All tasks during the stage "save at DatasetBulkInsertCommitActionExecutor.java:81" should have some data to process . **Environment Description** * Hudi version : * Spark version : -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
