[ 
https://issues.apache.org/jira/browse/FLINK-27823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601100#comment-17601100
 ] 

Kai Chen commented on FLINK-27823:
----------------------------------

Hi [~suheng.cloud].We met this problem, too.  Have you resolved it? Could you 
share your solution?

> Standalone Job continously restart by illegal checkpointId check on 
> PartitionTimeCommitTrigger when use  FilesystemTableSink
> ----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-27823
>                 URL: https://issues.apache.org/jira/browse/FLINK-27823
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.13.6
>            Reporter: suheng.cloud
>            Priority: Major
>         Attachments: image-2022-05-28-15-33-04-632.png, 
> image-2022-05-28-15-34-23-397.png, image-2022-05-28-15-34-46-698.png, 
> image-2022-05-28-15-35-02-186.png, image-2022-05-28-15-35-09-874.png, 
> image-2022-05-28-15-35-23-708.png
>
>
> Hi, community
> When I build up a standalone job to read from kafka topic and sink to hdfs, I 
> found the job continously restart after normal running 4 hours.
> When the first restart show up, the logs are like
>  
> {noformat}
> 2022-05-28 00:24:04,861 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
> checkpoint 26 (type=CHECKPOINT) @ 1653668644856 for job 
> 00000000000000000000000000000000.
> 2022-05-28 00:34:04,861 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 26 
> of job 00000000000000000000000000000000 expired before completing.
> 2022-05-28 00:34:04,866 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering 
> checkpoint 27 (type=CHECKPOINT) @ 1653669244862 for job 
> 00000000000000000000000000000000.
> 2022-05-28 00:41:02,208 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 27 for job 00000000000000000000000000000000 (117373 bytes in 
> 417284 ms).
> 2022-05-28 00:41:18,517 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - 
> PartitionCommitter -> Sink: end (1/1) (7e16853a4d16a80f96a3e26e17f9d677) 
> switched from RUNNING to FAILED on 192.168.1.142:6122-0b54e0 @ 192.168.1.142 
> (dataPort=43131).
> java.lang.IllegalArgumentException: Checkpoint(26) has not been snapshot. The 
> watermark information is:
> {27=1653668944610}
> .
> at 
> org.apache.flink.table.filesystem.stream.PartitionTimeCommitTrigger.committablePartitions(PartitionTimeCommitTrigger.java:122)
>  ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.table.filesystem.stream.PartitionCommitter.commitPartitions(PartitionCommitter.java:151)
>  ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.table.filesystem.stream.PartitionCommitter.processElement(PartitionCommitter.java:143)
>  ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
> ~[hamal-driver-1.13.6-v1.jar:?]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> ~[hamal-driver-1.13.6-v1.jar:?]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
> 2022-05-28 00:41:18,524 INFO 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>  [] - Calculating tasks to restart to recover the failed task 
> cc0206f9bd17ee99dc4565713cd749d7_0.
> 2022-05-28 00:41:18,525 INFO 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>  [] - 50 tasks should be restarted to recover the failed task 
> cc0206f9bd17ee99dc4565713cd749d7_0. 
> 2022-05-28 00:41:18,525 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job xxxxxx 
> (00000000000000000000000000000000) switched from state RUNNING to RESTARTING.
> 2022-05-28 00:41:18,526 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator 
> (24/24) (8d5ae185e722482d8b1ff4bc3ba60e86) switched from RUNNING to CANCELING.
> 2022-05-28 00:41:18,526 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator 
> (23/24) (369af96456d991046eb10cfee44df415) switched from RUNNING to CANCELING.
> ...
> ...
> {noformat}
>  
> after that, the job restart and successfully restore state form cp(using 
> state.checkpoint-storage=jobmanager), and the following checkpoint 
> (27/28/29/...) can also be sucessfully finished. But it seems the recovered 
> state try to report commit msg of old checkpoint 26 to the PartitionCommitter 
> which continously cause failures.
> Finally the job restart again and again, and the same error log likes
>  
> {noformat}
> 2022-05-28 08:36:23,718 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - 
> PartitionCommitter -> Sink: end (1/1) (669dfe28f49ec9b08cb1f605b7e1af86) 
> switched from RUNNING to FAILED on 192.168.1.226:6122-ac5e98 @ 192.168.1.226 
> (dataPort=41827).
> java.lang.IllegalArgumentException: Checkpoint(26) has not been snapshot. The 
> watermark information is:
> {27=1653668944610, 28=1653669762385, 29=1653669973437, 30=1653670045517, 
> 31=1653670584329, 32=1653671198834, 33=1653671316604, 34=1653671595057, 
> 35=1653671632382, 36=1653671940262, 37=1653672247793, 38=1653672513421, 
> 39=1653672626251, 40=1653672872425, 41=1653673029517, 42=1653673662173, 
> 43=1653673843265, 44=1653674382981, 45=1653674739299, 46=1653674890522, 
> 47=1653675402372, 48=1653675767340, 49=1653676205712, 50=1653676376692, 
> 51=1653676762574, 52=1653677105303, 53=1653677254604, 54=1653677458683, 
> 55=1653677651603, 57=1653678458691, 58=1653678931845, 59=1653679306742, 
> 60=1653679845020, 61=1653680406114, 62=1653680981416, 63=1653681545056, 
> 64=1653681584696, 65=1653681622029, 66=1653682017861, 67=1653682319529, 
> 68=1653682404672, 69=1653682559904, 70=1653682804993, 71=1653682907991, 
> 72=1653683279780, 73=1653683905573, 74=1653684156034, 75=1653684659397, 
> 76=1653684975030, 77=1653685329183, 78=1653685862724, 79=1653686499090, 
> 80=1653686636903, 81=1653686780782, 82=1653687053096, 83=1653687541953, 
> 84=1653688012617, 85=1653688337464, 86=1653688832762, 87=1653689195316, 
> 88=1653689330027, 89=1653689545859, 90=1653689957313, 91=1653690069643, 
> 92=1653690689424, 93=1653690963316, 94=1653691164532, 95=1653691687307, 
> 96=1653691885408, 97=1653692235231, 98=1653692428716, 99=1653692849146, 
> 100=1653693274253, 101=1653693438601, 102=1653694097925, 103=1653694716179, 
> 104=1653694770858, 105=1653695305421, 106=1653695464923, 107=1653695959050, 
> 108=1653696465917, 109=1653696825723, 110=1653696841452, 111=1653697238699, 
> 112=1653697882510}
> .
> at 
> org.apache.flink.table.filesystem.stream.PartitionTimeCommitTrigger.committablePartitions(PartitionTimeCommitTrigger.java:122)
>  ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.table.filesystem.stream.PartitionCommitter.commitPartitions(PartitionCommitter.java:151)
>  ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.table.filesystem.stream.PartitionCommitter.processElement(PartitionCommitter.java:143)
>  ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
>  ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
> ~[hamal-driver-1.13.6-v1.jar:?]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> ~[hamal-driver-1.13.6-v1.jar:?]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
> 2022-05-28 08:36:23,718 INFO 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>  [] - Calculating tasks to restart to recover the failed task 
> cc0206f9bd17ee99dc4565713cd749d7_0.
> 2022-05-28 08:36:23,719 INFO 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>  [] - 50 tasks should be restarted to recover the failed task 
> cc0206f9bd17ee99dc4565713cd749d7_0. 
> 2022-05-28 08:36:23,719 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job xxxxxx 
> (00000000000000000000000000000000) switched from state RUNNING to RESTARTING.
> 2022-05-28 08:36:23,719 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator 
> (24/24) (66177c2069b9aeef21376d7a780ceadb) switched from RUNNING to CANCELING.
> 2022-05-28 08:36:23,719 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator 
> (23/24) (7028f34c1756cd1fff5cf25dd12fd550) switched from RUNNING to CANCELING.
> {noformat}
> The job logic is very simple, which flink sql like
> {code:java}
> CREATE TEMPORAY TABLE filesystem_sink_table(....)
> PARTITIONED BY(`dt`,`hour`,`topic`) WITH(
> 'connector'='filesystem',
> 'format'='textfile',
> 'sink.partition-commit.trigger'='partition-time',
> 'sink.partition-commit.delay'='1 hour',
> 'sink.partition-commit.policy.kind'='success-file',
> 'auto-compaction' = 'true'
> ...
> );
> CREATE TEMPORARY TABLE kafka_source_table ...
> streamTableEnv.executeSql("INSERT INTO filesystem_sink_table SELECT ... FROM 
> kafka_source_table");
> {code}
>  
>  
> I have check the source at PartitionTimeCommitTrigger, and what puzzle me is 
> that it seems the watermarks should only remove the committed checkpointId 
> after pass the valiation
>  
> {code:java}
> ...
> if (!watermarks.containsKey(checkpointId)) { 
>    throw new IllegalArgumentException( String.format( "Checkpoint(%d) has not 
> been snapshot. The watermark information is: %s.", checkpointId, 
> watermarks)); 
> } 
> long watermark = watermarks.get(checkpointId); 
> watermarks.headMap(checkpointId, true).clear();
> ...
> {code}
> I have no idea in which scene should this exception encountered, do I mistake 
> some config or there some inconsistent state? 
> Thanks for any help.
>  
>  
> some runtime snapshot:
> !image-2022-05-28-15-35-23-708.png!
> !image-2022-05-28-15-35-09-874.png!
> !image-2022-05-28-15-35-02-186.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to