ChiaWeiGithub opened a new issue, #9174: URL: https://github.com/apache/hudi/issues/9174
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? - Join the mailing list to engage in conversations and get faster support at [email protected]. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** flink sql hudi will create duplicate record on same s3 path when` the data size is bigger enough to create new filegroup and different flink session with different yarn application.` A clear and concise description of the problem. **To Reproduce** Steps to reproduce the behavior: 1. Downloading e-commerce data from https://www.kaggle.com/datasets/mkechinov/ecommerce-behavior-data-from-multi-category-store 2. Upload to a s3 bucket 3. Run the below query from Athena. Please modify the s3 path ` CREATE EXTERNAL TABLE `ecommerce`( `event_time` string, `event_type` string, `product_id` bigint, `category_id` bigint, `category_code` string, `brand` string, `price` double, `user_id` bigint, `user_session` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://<bucket>/opendata/ecommerce/' CREATE TABLE ecommercedistinct WITH ( format = 'Parquet', write_compression = 'SNAPPY' ) AS SELECT sum(price) as price,user_session, CAST(NULL AS INTEGER) AS empty_column FROM "your_database"."ecommerce" where user_session IS NOT NULL group by user_session; ` 4. Since the data still has null data in user_session, it will break the query later on. We need to find out the null user session. select "$path" , user_session from "ecommercedistinct" order by user_session ASC; 5. random pick up 5 file under the s3 path of table ecommercedistinct, but avoid the file with null user_session 6. launch emr cluster primary : emr-6.7.0 with flink m5.2xlarge, 5 core node with m5.2xlarge. 7. create flink session `export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath` sudo /usr/lib/flink/bin/start-cluster.sh flink-yarn-session -d -jm 12288 -tm 12288 /usr/lib/flink/bin/sql-client.sh embedded -e /home/hadoop/sql-env.yaml flink sql> add jar '/usr/lib/hudi/hudi-flink1.14-bundle_2.12-0.11.0-amzn-0.jar'; # create source table. Please modified s3 path to your s3 path with data in partaccesslog.tar.gz CREATE TABLE tablesource1 ( `price` double, `user_session` varchar(255) PRIMARY KEY, `empty_column` int ) with ('connector'='filesystem', 'path'= 's3://<my_bucket>/tables/5file/','format'='parquet'); # create target table, set 'hoodie.copyonwrite.insert.split.size'='5000000' to enforce to create a bigger filegroup file, please modified s3 path accordingly. CREATE TABLE tabletarget1 ( `price` double, `user_session` varchar(255) PRIMARY KEY, `empty_column` int ) with ('connector'='hudi', 'path'= 's3://<my_bucket>/ecommercetarget1/', 'table.type' = 'COPY_ON_WRITE', 'hoodie.compaction.payload.class' = 'org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload', 'write.payload.class'= 'org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload', 'write.batch-size'='32', 'hoodie.copyonwrite.insert.split.size'='5000000', 'table.exec.sink.not-null-enforcer'='drop'); INSERT INTO tabletarget1 select * from tablesource1; <== take about 6+ minutes to finished to create a file ` 7. Make duplicate record stop the sql session kill yarn application, create a new one start new flink sql session, random choose auser_session id and insert ` INSERT INTO tabletarget1 (user_session,empty_column) VALUES ('0bc5bd30-4b48-4637-b5f0-9e327707d2bc',1); ` 8. Go to Amazon Athena to check , we can see 2 user_session='94CFQJ4T7409AW2P' ` CREATE EXTERNAL TABLE `tabletarget`( `price` double, `user_session` string, `empty_column` int) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://<my_bucket>/ecommercetarget1/'; select "$path","user_session" from tabletarget where user_session='94CFQJ4T7409AW2P'; ` **Expected behavior** The record is not duplicate when the data size is small. For example, if I create an empty table and insert values, it will always be the deduplicate. No matter I user new flink sql session or not. I expected the record should not be duplicated. A clear and concise description of what you expected to happen. **Environment Description** * Hudi version :0.11.0 * Flink version : 1.14.2 * Hadoop version :3.2.1 * Storage (HDFS/S3/GCS..) :S3 * Running on Docker? (yes/no) :no **Additional context** Add any other context about the problem here. **Stacktrace** ```Add the stacktrace of the error.``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
