vmaster.cc created FLINK-24539:
----------------------------------

             Summary: ChangelogNormalize operator tooks too long time to 
INITIALIZING until failed
                 Key: FLINK-24539
                 URL: https://issues.apache.org/jira/browse/FLINK-24539
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing, Runtime / Task
    Affects Versions: 1.13.1
         Environment: Flink version :1.13.1

TaskManager memory:

!image-2021-10-14-13-36-56-899.png|width=578,height=318!

JobManager memory:

!image-2021-10-14-13-37-51-445.png|width=578,height=229!
            Reporter: vmaster.cc
         Attachments: image-2021-10-14-13-19-08-215.png, 
image-2021-10-14-13-36-56-899.png, image-2021-10-14-13-37-51-445.png, 
taskmanager_container_e11_1631768043929_0012_01_000004_log.txt

I'm using debezium to produce cdc from mysql, considering its at least one 
delivery, so i must set the config 
'table.exec.source.cdc-events-duplicate=true'.


But when some unknown case make my task down, flink task restart  failed 
always. I found that ChangelogNormalize operator tooks too long time in 
INITIALIZING stage.

 

screenshot and log fragment are as follows:

!image-2021-10-14-13-19-08-215.png|width=567,height=293!

 
{code:java}
2021-10-14 12:32:33,660 INFO  
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
Finished building RocksDB keyed state-backend at 
/data3/yarn/nm/usercache/flink/appcache/application_1631768043929_0012/flink-io-f31735c3-e726-4c49-89a5-916670809b7a/job_7734977994a6a10f7cc784d50e4a1a34_op_KeyedProcessOperator_dc2290bb6f8f5cd2bd425368843494fe__1_1__uuid_6cbbe6ae-f43e-4d2a-b1fb-f0cb71f257af.2021-10-14
 12:32:33,662 INFO  org.apache.flink.runtime.taskmanager.Task                   
 [] - GroupAggregate(groupBy=[teacher_id, create_day], select=[teacher_id, 
create_day, SUM_RETRACT($f2) AS teacher_courseware_count]) -> 
Calc(select=[teacher_id, create_day, CAST(teacher_courseware_count) AS 
teacher_courseware_count]) -> NotNullEnforcer(fields=[teacher_id, create_day]) 
(1/1)#143 (9cca3ef1293cc6364698381bbda93998) switched from INITIALIZING to 
RUNNING.2021-10-14 12:38:07,581 INFO  org.apache.flink.runtime.taskmanager.Task 
                   [] - Ignoring checkpoint aborted notification for 
non-running task ChangelogNormalize(key=[c_id]) -> Calc(select=[c_author_id AS 
teacher_id, DATE_FORMAT(c_create_time, _UTF-16LE'yyyy-MM-dd') AS create_day, 
IF((c_state = 10), 1, 0) AS $f2], where=[((c_is_group = 0) AND (c_author_id <> 
_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))]) 
(1/1)#143.2021-10-14 12:38:07,581 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Attempting to 
cancel task Sink: 
Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count],
 fields=[teacher_id, create_day, teacher_courseware_count]) (2/2)#143 
(cc25f9ae49c4db01ab40ff103fae43fd).2021-10-14 12:38:07,581 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Sink: 
Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count],
 fields=[teacher_id, create_day, teacher_courseware_count]) (2/2)#143 
(cc25f9ae49c4db01ab40ff103fae43fd) switched from RUNNING to 
CANCELING.2021-10-14 12:38:07,581 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Triggering 
cancellation of task code Sink: 
Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count],
 fields=[teacher_id, create_day, teacher_courseware_count]) (2/2)#143 
(cc25f9ae49c4db01ab40ff103fae43fd).2021-10-14 12:38:07,583 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Attempting to 
cancel task Sink: 
Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count],
 fields=[teacher_id, create_day, teacher_courseware_count]) (1/2)#143 
(5419f41a3f0cc6c2f3f4c82c87f4ae22).2021-10-14 12:38:07,583 INFO  
org.apache.flink.runtime.taskmanager.Task                    [] - Sink: 
Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count],
 fields=[teacher_id, create_day, teacher_courseware_count]) (1/2)#143 
(5419f41a3f0cc6c2f3f4c82c87f4ae22) switched from RUNNING to CANCELING.
{code}
 

attention:

1、The table has a large amount of data, up to 500 million. 

2、Because the amount of data is very large, the rocksdb state backend is used

3、More other env infos ,see next section and the full log see attachment.
{code:java}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to