ChiaWeiGithub opened a new issue, #9174:
URL: https://github.com/apache/hudi/issues/9174

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   flink sql hudi will create duplicate record on same s3 path when` the data 
size is bigger enough to create new filegroup and different flink session with 
different yarn application.`
   
   
   A clear and concise description of the problem.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Downloading e-commerce data from 
https://www.kaggle.com/datasets/mkechinov/ecommerce-behavior-data-from-multi-category-store
   2. Upload to a s3 bucket
   3. Run the below query from Athena. Please modify the s3 path
   `
   
   CREATE EXTERNAL TABLE `ecommerce`(
     `event_time` string, 
     `event_type` string, 
     `product_id` bigint, 
     `category_id` bigint, 
     `category_code` string, 
     `brand` string, 
     `price` double, 
     `user_id` bigint, 
     `user_session` string)
   ROW FORMAT DELIMITED 
     FIELDS TERMINATED BY ',' 
   STORED AS INPUTFORMAT 
     'org.apache.hadoop.mapred.TextInputFormat' 
   OUTPUTFORMAT 
     'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
   LOCATION
     's3://<bucket>/opendata/ecommerce/'
   
   CREATE TABLE ecommercedistinct
   WITH (
         format = 'Parquet',
         write_compression = 'SNAPPY'
         )
   AS SELECT sum(price) as price,user_session,  CAST(NULL AS INTEGER) AS 
empty_column FROM "your_database"."ecommerce" where user_session IS NOT NULL 
group by user_session;
   `
   
   4. Since the data still has null data in user_session, it will break the 
query later on. We need to find out the null user session.
   
   select "$path" , user_session from "ecommercedistinct" order by  
user_session ASC;
   
   
   5. random pick up 5 file under the s3 path of table ecommercedistinct, but 
avoid the file with null user_session
   6. launch emr cluster primary : emr-6.7.0 with flink m5.2xlarge, 5 core node 
with m5.2xlarge. 
   7. create flink session
   `export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
   sudo /usr/lib/flink/bin/start-cluster.sh
   flink-yarn-session -d  -jm 12288 -tm 12288
   
   /usr/lib/flink/bin/sql-client.sh embedded -e /home/hadoop/sql-env.yaml 
   flink sql>
   
   add jar '/usr/lib/hudi/hudi-flink1.14-bundle_2.12-0.11.0-amzn-0.jar';
   
   # create source table. Please modified s3 path to your s3 path with data in 
partaccesslog.tar.gz
   CREATE TABLE tablesource1
    (
    `price` double,
    `user_session` varchar(255)  PRIMARY KEY,
    `empty_column` int
    ) with ('connector'='filesystem', 'path'=
   's3://<my_bucket>/tables/5file/','format'='parquet'); 
   
   # create target table, set 'hoodie.copyonwrite.insert.split.size'='5000000' 
to enforce to create a bigger filegroup file, please modified s3 path 
accordingly.
   CREATE TABLE tabletarget1
    (
    `price` double,
    `user_session` varchar(255)  PRIMARY KEY,
    `empty_column` int
    ) with ('connector'='hudi', 'path'=
   's3://<my_bucket>/ecommercetarget1/', 'table.type' =
   'COPY_ON_WRITE', 'hoodie.compaction.payload.class' =
   'org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload',
   'write.payload.class'=
   'org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload',
   'write.batch-size'='32',
   'hoodie.copyonwrite.insert.split.size'='5000000',
   'table.exec.sink.not-null-enforcer'='drop'); 
   
   
   INSERT INTO tabletarget1 select * from tablesource1; <== take about 6+ 
minutes to finished to create a file
   `
   
   7. Make duplicate record
   stop the sql session kill yarn application, create a new one start new flink 
sql session, random choose auser_session id and insert
   
   `
   INSERT INTO tabletarget1 (user_session,empty_column) VALUES 
('0bc5bd30-4b48-4637-b5f0-9e327707d2bc',1);
   
   `
   8. Go to Amazon Athena to check , we can see 2  
user_session='94CFQJ4T7409AW2P'
   
   `
   CREATE EXTERNAL TABLE `tabletarget`(
     `price` double, 
     `user_session` string, 
     `empty_column` int)
   ROW FORMAT SERDE 
     'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
   STORED AS INPUTFORMAT 
     'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
   OUTPUTFORMAT 
     'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
   LOCATION
     's3://<my_bucket>/ecommercetarget1/';
   
   select "$path","user_session" from tabletarget where 
user_session='94CFQJ4T7409AW2P';
   `
   
   **Expected behavior**
   
   The record is not duplicate when the data size is small. For example, if I 
create an empty table and insert values, it will always be the deduplicate. No 
matter I user new flink sql session or not.
   
   I expected the record should not be duplicated.
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version :0.11.0
   
   * Flink version : 1.14.2
   
   * Hadoop version :3.2.1
   
   * Storage (HDFS/S3/GCS..) :S3
   
   * Running on Docker? (yes/no) :no
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to