xuzhiwen1255 opened a new issue, #6531:
URL: https://github.com/apache/iceberg/issues/6531

   ### Apache Iceberg version
   
   1.1.0 (latest release)
   
   ### Query engine
   
   Flink
   
   ### Please describe the bug 🐞
   
   After upsert is enabled on my flink table, I write 300w data in advance and 
read the table in stream read mode. As a result, all checkpoint entries fail. 
After all data in the table is read,checkpoint works properly
   
   According to my analysis, the upstream monitor sent splits and blocked the 
pipeline, so the reader operator could not get the checkpoint barrier at the 
first time. As a result, checkpoint could not start, and finally, checkpoint 
timeout problem
   
   Like the following : 
       --------------------------------------------------
       barrier, split , split , split , split , split , split , split  丨  --- 
split --->  reader : processing split  --> reader File
       --------------------------------------------------
                                     .. processing ..
       --------------------------------------------------
     split , barrier, split , split , split , barrier , split , split 丨  --- 
split --->  reader : processing barrier --> trigger checkpoint
       --------------------------------------------------
   The barrier is processed only after all the splits before it have been 
processed, triggering a checkpoint.
   
   According to the actual situation of my test, because upsert is enabled, a 
lot of delete data will be generated, which will lead to a long time to process 
split. In the case of reader operator, the processing speed is slow, resulting 
in backpressure
   
   The split read is performed by the flink task thread, and since the same 
thread is used, a blocking problem occurs
   
   Here is my sql
   ```sql
   
   
   CREATE TABLE dg (
       id INT,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR,c4 VARCHAR,c5 VARCHAR,c6 
VARCHAR,c7 VARCHAR,c8 VARCHAR,c9 VARCHAR,c10 VARCHAR,c11 VARCHAR,c12 
VARCHAR,c13 VARCHAR,c14 VARCHAR,c15 VARCHAR,c16 VARCHAR,c17 VARCHAR,c18 
VARCHAR,c19 VARCHAR,c20 VARCHAR,c21 VARCHAR,c22 VARCHAR,c23 VARCHAR,c24 
VARCHAR,c25 VARCHAR,c26 VARCHAR,c27 VARCHAR,c28 VARCHAR,c29 VARCHAR,c30 
VARCHAR,c31 VARCHAR,c32 VARCHAR 
   )
   WITH (
       'connector' = 'datagen',
       'rows-per-second' = '10000',
       'number-of-rows' = '300000',
       'fields.id.min'='0',
       'fields.id.max'='50000001',
       'fields.c1.length' = '20','fields.c2.length' = '20','fields.c3.length' = 
'20','fields.c4.length' = '20','fields.c5.length' = '20','fields.c6.length' = 
'20','fields.c7.length' = '20','fields.c8.length' = '20','fields.c9.length' = 
'20','fields.c10.length' = '20','fields.c11.length' = '20','fields.c12.length' 
= '20','fields.c13.length' = '20','fields.c14.length' = 
'20','fields.c15.length' = '20','fields.c16.length' = '20','fields.c17.length' 
= '20','fields.c18.length' = '20','fields.c19.length' = 
'20','fields.c20.length' = '20','fields.c21.length' = '20','fields.c22.length' 
= '20','fields.c23.length' = '20','fields.c24.length' = 
'20','fields.c25.length' = '20','fields.c26.length' = '20','fields.c27.length' 
= '20','fields.c28.length' = '20','fields.c29.length' = 
'20','fields.c30.length' = '20','fields.c31.length' = '20','fields.c32.length' 
= '20'
     );
   
   
   create table hc.db.test1(
    id INT,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR,c4 VARCHAR,c5 VARCHAR,c6 VARCHAR,c7 
VARCHAR,c8 VARCHAR,c9 VARCHAR,c10 VARCHAR,c11 VARCHAR,c12 VARCHAR,c13 
VARCHAR,c14 VARCHAR,c15 VARCHAR,c16 VARCHAR,c17 VARCHAR,c18 VARCHAR,c19 
VARCHAR,c20 VARCHAR,c21 VARCHAR,c22 VARCHAR,c23 VARCHAR,c24 VARCHAR,c25 
VARCHAR,c26 VARCHAR,c27 VARCHAR,c28 VARCHAR,c29 VARCHAR,c30 VARCHAR,c31 
VARCHAR,c32 VARCHAR
   , PRIMARY KEY (`id`) NOT ENFORCED
   ) with (
    'format-version'='2',
    'write.upsert.enabled' = 'true'
    );
   
   
   create table hc.db.test4(
    id INT,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR,c4 VARCHAR,c5 VARCHAR,c6 VARCHAR,c7 
VARCHAR,c8 VARCHAR,c9 VARCHAR,c10 VARCHAR,c11 VARCHAR,c12 VARCHAR,c13 
VARCHAR,c14 VARCHAR,c15 VARCHAR,c16 VARCHAR,c17 VARCHAR,c18 VARCHAR,c19 
VARCHAR,c20 VARCHAR,c21 VARCHAR,c22 VARCHAR,c23 VARCHAR,c24 VARCHAR,c25 
VARCHAR,c26 VARCHAR,c27 VARCHAR,c28 VARCHAR,c29 VARCHAR,c30 VARCHAR,c31 
VARCHAR,c32 VARCHAR
   , PRIMARY KEY (`id`) NOT ENFORCED
   ) with (
    'format-version'='2',
    'write.upsert.enabled' = 'true'
    );
   
   -- streaming read
   insert into hc.db.test4  select * from hc.db.test1 /*+ 
OPTIONS('streaming'='true', 'monitor-interval'='10s')*/;
   
   ```
   
   
   
![image](https://user-images.githubusercontent.com/105710753/210957322-b79123ef-ab9d-4b6d-8b63-2ec395354a4a.png)
   
   
   
![image](https://user-images.githubusercontent.com/105710753/210957211-7fd03036-c528-4af0-a3cb-baba16232848.png)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to