[ 
https://issues.apache.org/jira/browse/FLINK-24539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432172#comment-17432172
 ] 

vmaster.cc commented on FLINK-24539:
------------------------------------

[~pnowojski] Thank you very much, i have subscribed the user mailling list. I 
presume it's busy recovering it's state too, so i have optimized the logic to 
avoid using Flink to process full data. 

You said that 'unless you are using unaligned checkpoints', how can i use this? 

> ChangelogNormalize operator tooks too long time to INITIALIZING until failed
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-24539
>                 URL: https://issues.apache.org/jira/browse/FLINK-24539
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / Task, Table SQL / 
> Runtime
>    Affects Versions: 1.13.1
>         Environment: Flink version :1.13.1
> TaskManager memory:
> !image-2021-10-14-13-36-56-899.png|width=578,height=318!
> JobManager memory:
> !image-2021-10-14-13-37-51-445.png|width=578,height=229!
>            Reporter: vmaster.cc
>            Priority: Major
>         Attachments: image-2021-10-14-13-19-08-215.png, 
> image-2021-10-14-13-36-56-899.png, image-2021-10-14-13-37-51-445.png, 
> image-2021-10-14-14-13-13-370.png, image-2021-10-14-14-15-40-101.png, 
> image-2021-10-14-14-16-33-080.png, 
> taskmanager_container_e11_1631768043929_0012_01_000004_log.txt
>
>
> I'm using debezium to produce cdc from mysql, considering its at least one 
> delivery, so i must set the config 
> 'table.exec.source.cdc-events-duplicate=true'.
> But when some unknown case make my task down, flink task restart  failed 
> always. I found that ChangelogNormalize operator tooks too long time in 
> INITIALIZING stage.
>  
> screenshot and log fragment are as follows:
> !image-2021-10-14-13-19-08-215.png|width=567,height=293!
>  
> {code:java}
> 2021-10-14 12:32:33,660 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
> Finished building RocksDB keyed state-backend at 
> /data3/yarn/nm/usercache/flink/appcache/application_1631768043929_0012/flink-io-f31735c3-e726-4c49-89a5-916670809b7a/job_7734977994a6a10f7cc784d50e4a1a34_op_KeyedProcessOperator_dc2290bb6f8f5cd2bd425368843494fe__1_1__uuid_6cbbe6ae-f43e-4d2a-b1fb-f0cb71f257af.2021-10-14
>  12:32:33,662 INFO  org.apache.flink.runtime.taskmanager.Task                 
>    [] - GroupAggregate(groupBy=[teacher_id, create_day], select=[teacher_id, 
> create_day, SUM_RETRACT($f2) AS teacher_courseware_count]) -> 
> Calc(select=[teacher_id, create_day, CAST(teacher_courseware_count) AS 
> teacher_courseware_count]) -> NotNullEnforcer(fields=[teacher_id, 
> create_day]) (1/1)#143 (9cca3ef1293cc6364698381bbda93998) switched from 
> INITIALIZING to RUNNING.2021-10-14 12:38:07,581 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Ignoring 
> checkpoint aborted notification for non-running task 
> ChangelogNormalize(key=[c_id]) -> Calc(select=[c_author_id AS teacher_id, 
> DATE_FORMAT(c_create_time, _UTF-16LE'yyyy-MM-dd') AS create_day, IF((c_state 
> = 10), 1, 0) AS $f2], where=[((c_is_group = 0) AND (c_author_id <> 
> _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))]) 
> (1/1)#143.2021-10-14 12:38:07,581 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Attempting 
> to cancel task Sink: 
> Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count],
>  fields=[teacher_id, create_day, teacher_courseware_count]) (2/2)#143 
> (cc25f9ae49c4db01ab40ff103fae43fd).2021-10-14 12:38:07,581 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Sink: 
> Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count],
>  fields=[teacher_id, create_day, teacher_courseware_count]) (2/2)#143 
> (cc25f9ae49c4db01ab40ff103fae43fd) switched from RUNNING to 
> CANCELING.2021-10-14 12:38:07,581 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Triggering 
> cancellation of task code Sink: 
> Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count],
>  fields=[teacher_id, create_day, teacher_courseware_count]) (2/2)#143 
> (cc25f9ae49c4db01ab40ff103fae43fd).2021-10-14 12:38:07,583 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Attempting 
> to cancel task Sink: 
> Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count],
>  fields=[teacher_id, create_day, teacher_courseware_count]) (1/2)#143 
> (5419f41a3f0cc6c2f3f4c82c87f4ae22).2021-10-14 12:38:07,583 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Sink: 
> Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count],
>  fields=[teacher_id, create_day, teacher_courseware_count]) (1/2)#143 
> (5419f41a3f0cc6c2f3f4c82c87f4ae22) switched from RUNNING to CANCELING.
> {code}
>  
> attention:
> 1、The table has a large amount of data, up to 500 million. 
> 2、Because the amount of data is very large, the rocksdb state backend is used
> 3、More other env infos ,see next section and the full log see attachment.
> {code:java}
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to