xuzhiwen1255 opened a new issue, #5614:
URL: https://github.com/apache/iceberg/issues/5614

   ### Apache Iceberg version
   
   0.13.1
   
   ### Query engine
   
   Flink
   
   ### Please describe the bug 🐞
   
   versions 
    flink : 1.14.5
    iceberg : 0.13.1 and master
   
   ###Problem description
   I wrote 500 million data by Flink `datagen` collector to Iceberg. When I 
`selecte count(*) from t`, I found that the actual data exceeded 500 million
   
   ###Operation steps
   ```sql
   -- start  flink-yarn-session
   bin/yarn-session.sh  -D pipeline.operator-chaining=false  -D 
taskmanager.memory.jvm-overhead.min=400MB -D 
taskmanager.memory.jvm-overhead.max=1000MB  -D env.java.opts="-XX:+UseG1GC"  -s 
1 -jm 4096 -tm 4096 -nm flink-sql-session -d
   -- start flink-sql-client    use icebeg version 0.14 and 0.13.1
   ./sql-client.sh embedded -j 
../icebergjars/-iceberg-flink-runtime-1.14-0.13.1.jar -s yarn-session
   drop table dg;
   CREATE TABLE dg (
       id INT,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR,c4 VARCHAR,c5 VARCHAR,c6 
VARCHAR,c7 VARCHAR,c8 VARCHAR,c9 VARCHAR,c10 VARCHAR,c11 VARCHAR,c12 
VARCHAR,c13 VARCHAR,c14 VARCHAR,c15 VARCHAR,c16 VARCHAR,c17 VARCHAR,c18 
VARCHAR,c19 VARCHAR,c20 VARCHAR,p int
   )
   WITH (
       'connector' = 'datagen',
       'rows-per-second' = '100000000',
       'number-of-rows' = '500000000',
       'fields.id.min'='1',
       'fields.id.max'='147483647',
       'fields.c1.length' = '20','fields.c2.length' = '20','fields.c3.length' = 
'20','fields.c4.length' = '20','fields.c5.length' = '20','fields.c6.length' = 
'20','fields.c7.length' = '20','fields.c8.length' = '20','fields.c9.length' = 
'20','fields.c10.length' = '20','fields.c11.length' = '20','fields.c12.length' 
= '20','fields.c13.length' = '20','fields.c14.length' = 
'20','fields.c15.length' = '20','fields.c16.length' = '20','fields.c17.length' 
= '20','fields.c18.length' = '20','fields.c19.length' = 
'20','fields.c20.length' = '20',
      'fields.id.min'='1','fields.id.max'='1000000'
     ); 
    CREATE CATALOG hc WITH (
     'type'='iceberg',
     'catalog-type'='hadoop',
     'warehouse'='hdfs://xxx:8020/user/iceberg/warehouse',
     'property-version'='1'
   );
   
   create table hc.db.t1(
    id INT,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR,c4 VARCHAR,c5 VARCHAR,c6 VARCHAR,c7 
VARCHAR,c8 VARCHAR,c9 VARCHAR,c10 VARCHAR,c11 VARCHAR,c12 VARCHAR,c13 
VARCHAR,c14 VARCHAR,c15 VARCHAR,c16 VARCHAR,c17 VARCHAR,c18 VARCHAR,c19 
VARCHAR,c20 VARCHAR,p int
   ) ;
   
   set parallelism.default=30;
   set execution.checkpointing.interval=30sec;
   insert into hc.db.t1 select * from dg;
   
   ```
   
   ####Problems that arise
   <img width="606" alt="image" 
src="https://user-images.githubusercontent.com/105710753/186053359-0e31c462-ea2b-4654-9ea9-6ca4bf7339ef.png";>
   
   <img width="592" alt="image" 
src="https://user-images.githubusercontent.com/105710753/186053421-5e4665c7-c1f6-4ee4-b1a2-3d2171a58aac.png";>
   
   Through my test, I found that after the task runs for a period of time, in 
special cases, icebergfiles committee will receive two identical files after 
the checkpoint is triggered Finally, when the metadata is written, the same 
file is referenced twice in the manifest list. The query result will exceed the 
expected 500 million data
   
   I added a custom operator to add some log output. When duplicate files 
appear in the same checkpoint, I will throw an exception
   
   Log as follows
   ```java
   2022-08-22 20:38:24,420 INFO  
org.apache.iceberg.flink.sink.IcebergFilesCommitter          [] - Start to 
flush snapshot state to state backend, table: hc.db.t1, checkpointId: 27
   2022-08-22 20:38:24,686 INFO  
org.apache.iceberg.flink.sink.IcebergFilesCommitter          [] - Committing 
append with 30 data files and 0 delete files to table hc.db.t1
   2022-08-22 20:38:25,664 INFO  
org.apache.iceberg.hadoop.HadoopTableOperations              [] - Committed a 
new metadata file 
hdfs://?:8020/user/iceberg/warehouse/db/t1/metadata/v28.metadata.json
   2022-08-22 20:38:25,678 INFO  org.apache.iceberg.SnapshotProducer            
              [] - Committed snapshot 6794546868781528751 (MergeAppend)
   2022-08-22 20:38:25,692 INFO  
org.apache.iceberg.flink.sink.IcebergFilesCommitter          [] - Committed in 
1006 ms
   2022-08-22 20:38:36,464 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 0,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00004-0-e25f4e48-66e5-4351-bdc8-f65f06beb1ef-00028.parquet
 --- 
   2022-08-22 20:38:51,924 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 0,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00028-0-90a05360-fdd9-4030-a299-ab209b9d8640-00028.parquet
 --- 
   2022-08-22 20:38:52,197 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 0,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00003-0-2dc897ac-7bae-4a99-81f5-7965b81b8f41-00028.parquet
 --- 
   2022-08-22 20:38:52,445 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 4,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00004-0-e25f4e48-66e5-4351-bdc8-f65f06beb1ef-00028.parquet
 --- 
   2022-08-22 20:38:52,446 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 3,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00003-0-2dc897ac-7bae-4a99-81f5-7965b81b8f41-00028.parquet
 --- 
   2022-08-22 20:38:52,447 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 28,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00028-0-90a05360-fdd9-4030-a299-ab209b9d8640-00028.parquet
 --- 
   2022-08-22 20:38:52,515 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 5,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00005-0-04d90606-4e68-440e-9ce5-58f1e0f2520f-00028.parquet
 --- 
   2022-08-22 20:38:52,516 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 27,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00027-0-e1e4968b-754c-4aad-9bbd-fae9152124d7-00028.parquet
 --- 
   2022-08-22 20:38:52,517 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 25,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00025-0-a2f49612-f038-42c4-83ed-ea1781bffa52-00028.parquet
 --- 
   2022-08-22 20:38:52,523 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 13,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00013-0-41ad64c5-4504-4c8b-bed7-02fd035cfda9-00028.parquet
 --- 
   2022-08-22 20:38:52,700 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 21,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00021-0-5617be9f-5b54-4d7f-bff0-c5c84dd211b3-00028.parquet
 --- 
   2022-08-22 20:38:52,778 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 16,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00016-0-f6b85e68-476a-4091-aba9-b359187ad128-00028.parquet
 --- 
   2022-08-22 20:38:52,800 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 7,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00007-0-bbb9ff21-7425-4eab-9506-3b37953c3143-00028.parquet
 --- 
   2022-08-22 20:38:52,855 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 17,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00017-0-aedbb2ed-0e6f-4c41-bd9c-b79e94a77f5c-00028.parquet
 --- 
   2022-08-22 20:38:52,903 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 0,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00000-0-0c5a8c91-66b5-4c9a-8b6d-9bc0752aae98-00028.parquet
 --- 
   2022-08-22 20:38:52,923 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 23,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00023-0-a987c77b-580f-462e-a824-9111b2f265e0-00028.parquet
 --- 
   2022-08-22 20:38:52,926 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 2,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00002-0-d90d3062-60e1-4886-a9c5-337867ab36f9-00028.parquet
 --- 
   2022-08-22 20:38:52,936 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 26,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00026-0-d4c638a8-0e1d-4592-afad-c88f438ce4e3-00028.parquet
 --- 
   2022-08-22 20:38:52,955 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 18,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00018-0-1120866c-36ee-47d4-8584-ac6e594e7c8d-00028.parquet
 --- 
   2022-08-22 20:38:52,983 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 29,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00029-0-aca3dc1a-20e3-4833-a896-eb74b4fedd2c-00028.parquet
 --- 
   2022-08-22 20:38:53,090 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 22,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00022-0-23ac6b11-0b88-4c62-85e8-901691dc5d14-00028.parquet
 --- 
   2022-08-22 20:38:53,141 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 11,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00011-0-b0966207-bdd0-4796-9114-f843d494f8f7-00028.parquet
 --- 
   2022-08-22 20:38:53,161 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 15,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00015-0-abf724a5-3810-4b85-af27-2dfea9c168a9-00028.parquet
 --- 
   2022-08-22 20:38:53,165 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 1,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00001-0-78b2ab81-8ea8-461d-a018-7fd64d714920-00028.parquet
 --- 
   2022-08-22 20:38:53,180 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 12,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00012-0-9cae32e4-97cd-4fa9-a130-2b9e5baea091-00028.parquet
 --- 
   2022-08-22 20:38:53,200 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 6,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00006-0-546e20d4-2583-4d67-89ef-bdca64d3ec08-00028.parquet
 --- 
   2022-08-22 20:38:53,323 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 14,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00014-0-798c97ee-e504-4e1d-8529-f630c56683e7-00028.parquet
 --- 
   2022-08-22 20:38:53,329 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 20,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00020-0-3dac110a-6303-472a-96fd-a07bd8dca9ae-00028.parquet
 --- 
   2022-08-22 20:38:53,367 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 19,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00019-0-10e161ac-4b99-4061-85a5-01c6f7fa01b9-00028.parquet
 --- 
   2022-08-22 20:38:53,414 WARN  org.apache.iceberg.flink.sink.MapTest          
               [] -  accepted subtask 24,add 
path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00024-0-083dfaff-2144-4d17-bbb5-325d29ea1350-00028.parquet
 --- 
   2022-08-22 20:38:53,423 WARN  org.apache.flink.runtime.taskmanager.Task      
              [] - xzw-consutm (1/1)#0 (04aac6f60f4c5e1b3eb49dc09374029d) 
switched from RUNNING to FAILED with failure cause: 
   java.lang.RuntimeException: Same path is  
=hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00028-0-90a05360-fdd9-4030-a299-ab209b9d8640-00028.parquet
 last send subtask = 0 ,cur send subtask = 28   
    
   ```
   
   @rdblue Please have a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to