[
https://issues.apache.org/jira/browse/FLINK-27823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601100#comment-17601100
]
Kai Chen edited comment on FLINK-27823 at 9/7/22 2:01 AM:
----------------------------------------------------------
Hi [~suheng.cloud].We met this problem, too. Have you resolved it? Could you
share your solution?
We currently just ignore the illegalArgument Exception and continue.
was (Author: yuchuanchen):
Hi [~suheng.cloud].We met this problem, too. Have you resolved it? Could you
share your solution?
> Standalone Job continously restart by illegal checkpointId check on
> PartitionTimeCommitTrigger when use FilesystemTableSink
> ----------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-27823
> URL: https://issues.apache.org/jira/browse/FLINK-27823
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.13.6
> Reporter: suheng.cloud
> Priority: Major
> Attachments: image-2022-05-28-15-33-04-632.png,
> image-2022-05-28-15-34-23-397.png, image-2022-05-28-15-34-46-698.png,
> image-2022-05-28-15-35-02-186.png, image-2022-05-28-15-35-09-874.png,
> image-2022-05-28-15-35-23-708.png
>
>
> Hi, community
> When I build up a standalone job to read from kafka topic and sink to hdfs, I
> found the job continously restart after normal running 4 hours.
> When the first restart show up, the logs are like
>
> {noformat}
> 2022-05-28 00:24:04,861 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
> checkpoint 26 (type=CHECKPOINT) @ 1653668644856 for job
> 00000000000000000000000000000000.
> 2022-05-28 00:34:04,861 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 26
> of job 00000000000000000000000000000000 expired before completing.
> 2022-05-28 00:34:04,866 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
> checkpoint 27 (type=CHECKPOINT) @ 1653669244862 for job
> 00000000000000000000000000000000.
> 2022-05-28 00:41:02,208 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
> checkpoint 27 for job 00000000000000000000000000000000 (117373 bytes in
> 417284 ms).
> 2022-05-28 00:41:18,517 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
> PartitionCommitter -> Sink: end (1/1) (7e16853a4d16a80f96a3e26e17f9d677)
> switched from RUNNING to FAILED on 192.168.1.142:6122-0b54e0 @ 192.168.1.142
> (dataPort=43131).
> java.lang.IllegalArgumentException: Checkpoint(26) has not been snapshot. The
> watermark information is:
> {27=1653668944610}
> .
> at
> org.apache.flink.table.filesystem.stream.PartitionTimeCommitTrigger.committablePartitions(PartitionTimeCommitTrigger.java:122)
> ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.filesystem.stream.PartitionCommitter.commitPartitions(PartitionCommitter.java:151)
> ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.filesystem.stream.PartitionCommitter.processElement(PartitionCommitter.java:143)
> ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> ~[hamal-driver-1.13.6-v1.jar:?]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> ~[hamal-driver-1.13.6-v1.jar:?]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
> 2022-05-28 00:41:18,524 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
> [] - Calculating tasks to restart to recover the failed task
> cc0206f9bd17ee99dc4565713cd749d7_0.
> 2022-05-28 00:41:18,525 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
> [] - 50 tasks should be restarted to recover the failed task
> cc0206f9bd17ee99dc4565713cd749d7_0.
> 2022-05-28 00:41:18,525 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job xxxxxx
> (00000000000000000000000000000000) switched from state RUNNING to RESTARTING.
> 2022-05-28 00:41:18,526 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
> (24/24) (8d5ae185e722482d8b1ff4bc3ba60e86) switched from RUNNING to CANCELING.
> 2022-05-28 00:41:18,526 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
> (23/24) (369af96456d991046eb10cfee44df415) switched from RUNNING to CANCELING.
> ...
> ...
> {noformat}
>
> after that, the job restart and successfully restore state form cp(using
> state.checkpoint-storage=jobmanager), and the following checkpoint
> (27/28/29/...) can also be sucessfully finished. But it seems the recovered
> state try to report commit msg of old checkpoint 26 to the PartitionCommitter
> which continously cause failures.
> Finally the job restart again and again, and the same error log likes
>
> {noformat}
> 2022-05-28 08:36:23,718 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
> PartitionCommitter -> Sink: end (1/1) (669dfe28f49ec9b08cb1f605b7e1af86)
> switched from RUNNING to FAILED on 192.168.1.226:6122-ac5e98 @ 192.168.1.226
> (dataPort=41827).
> java.lang.IllegalArgumentException: Checkpoint(26) has not been snapshot. The
> watermark information is:
> {27=1653668944610, 28=1653669762385, 29=1653669973437, 30=1653670045517,
> 31=1653670584329, 32=1653671198834, 33=1653671316604, 34=1653671595057,
> 35=1653671632382, 36=1653671940262, 37=1653672247793, 38=1653672513421,
> 39=1653672626251, 40=1653672872425, 41=1653673029517, 42=1653673662173,
> 43=1653673843265, 44=1653674382981, 45=1653674739299, 46=1653674890522,
> 47=1653675402372, 48=1653675767340, 49=1653676205712, 50=1653676376692,
> 51=1653676762574, 52=1653677105303, 53=1653677254604, 54=1653677458683,
> 55=1653677651603, 57=1653678458691, 58=1653678931845, 59=1653679306742,
> 60=1653679845020, 61=1653680406114, 62=1653680981416, 63=1653681545056,
> 64=1653681584696, 65=1653681622029, 66=1653682017861, 67=1653682319529,
> 68=1653682404672, 69=1653682559904, 70=1653682804993, 71=1653682907991,
> 72=1653683279780, 73=1653683905573, 74=1653684156034, 75=1653684659397,
> 76=1653684975030, 77=1653685329183, 78=1653685862724, 79=1653686499090,
> 80=1653686636903, 81=1653686780782, 82=1653687053096, 83=1653687541953,
> 84=1653688012617, 85=1653688337464, 86=1653688832762, 87=1653689195316,
> 88=1653689330027, 89=1653689545859, 90=1653689957313, 91=1653690069643,
> 92=1653690689424, 93=1653690963316, 94=1653691164532, 95=1653691687307,
> 96=1653691885408, 97=1653692235231, 98=1653692428716, 99=1653692849146,
> 100=1653693274253, 101=1653693438601, 102=1653694097925, 103=1653694716179,
> 104=1653694770858, 105=1653695305421, 106=1653695464923, 107=1653695959050,
> 108=1653696465917, 109=1653696825723, 110=1653696841452, 111=1653697238699,
> 112=1653697882510}
> .
> at
> org.apache.flink.table.filesystem.stream.PartitionTimeCommitTrigger.committablePartitions(PartitionTimeCommitTrigger.java:122)
> ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.filesystem.stream.PartitionCommitter.commitPartitions(PartitionCommitter.java:151)
> ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.filesystem.stream.PartitionCommitter.processElement(PartitionCommitter.java:143)
> ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> ~[hamal-driver-1.13.6-v1.jar:?]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> ~[hamal-driver-1.13.6-v1.jar:?]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
> 2022-05-28 08:36:23,718 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
> [] - Calculating tasks to restart to recover the failed task
> cc0206f9bd17ee99dc4565713cd749d7_0.
> 2022-05-28 08:36:23,719 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
> [] - 50 tasks should be restarted to recover the failed task
> cc0206f9bd17ee99dc4565713cd749d7_0.
> 2022-05-28 08:36:23,719 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job xxxxxx
> (00000000000000000000000000000000) switched from state RUNNING to RESTARTING.
> 2022-05-28 08:36:23,719 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
> (24/24) (66177c2069b9aeef21376d7a780ceadb) switched from RUNNING to CANCELING.
> 2022-05-28 08:36:23,719 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
> (23/24) (7028f34c1756cd1fff5cf25dd12fd550) switched from RUNNING to CANCELING.
> {noformat}
> The job logic is very simple, which flink sql like
> {code:java}
> CREATE TEMPORAY TABLE filesystem_sink_table(....)
> PARTITIONED BY(`dt`,`hour`,`topic`) WITH(
> 'connector'='filesystem',
> 'format'='textfile',
> 'sink.partition-commit.trigger'='partition-time',
> 'sink.partition-commit.delay'='1 hour',
> 'sink.partition-commit.policy.kind'='success-file',
> 'auto-compaction' = 'true'
> ...
> );
> CREATE TEMPORARY TABLE kafka_source_table ...
> streamTableEnv.executeSql("INSERT INTO filesystem_sink_table SELECT ... FROM
> kafka_source_table");
> {code}
>
>
> I have check the source at PartitionTimeCommitTrigger, and what puzzle me is
> that it seems the watermarks should only remove the committed checkpointId
> after pass the valiation
>
> {code:java}
> ...
> if (!watermarks.containsKey(checkpointId)) {
> throw new IllegalArgumentException( String.format( "Checkpoint(%d) has not
> been snapshot. The watermark information is: %s.", checkpointId,
> watermarks));
> }
> long watermark = watermarks.get(checkpointId);
> watermarks.headMap(checkpointId, true).clear();
> ...
> {code}
> I have no idea in which scene should this exception encountered, do I mistake
> some config or there some inconsistent state?
> Thanks for any help.
>
>
> some runtime snapshot:
> !image-2022-05-28-15-35-23-708.png!
> !image-2022-05-28-15-35-09-874.png!
> !image-2022-05-28-15-35-02-186.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)