[
https://issues.apache.org/jira/browse/FLINK-27823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
suheng.cloud updated FLINK-27823:
---------------------------------
Description:
Hi, community
When I build up a standalone job to read from kafka topic and sink to hdfs, I
found the job continously restart after normal running 4 hours.
When the first restart show up, the logs are like
{noformat}
2022-05-28 00:24:04,861 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 26 (type=CHECKPOINT) @ 1653668644856 for job
00000000000000000000000000000000.
2022-05-28 00:34:04,861 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 26 of
job 00000000000000000000000000000000 expired before completing.
2022-05-28 00:34:04,866 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 27 (type=CHECKPOINT) @ 1653669244862 for job
00000000000000000000000000000000.
2022-05-28 00:41:02,208 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
checkpoint 27 for job 00000000000000000000000000000000 (117373 bytes in 417284
ms).
2022-05-28 00:41:18,517 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - PartitionCommitter
-> Sink: end (1/1) (7e16853a4d16a80f96a3e26e17f9d677) switched from RUNNING to
FAILED on 192.168.1.142:6122-0b54e0 @ 192.168.1.142 (dataPort=43131).
java.lang.IllegalArgumentException: Checkpoint(26) has not been snapshot. The
watermark information is:
{27=1653668944610}
.
at
org.apache.flink.table.filesystem.stream.PartitionTimeCommitTrigger.committablePartitions(PartitionTimeCommitTrigger.java:122)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.table.filesystem.stream.PartitionCommitter.commitPartitions(PartitionCommitter.java:151)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.table.filesystem.stream.PartitionCommitter.processElement(PartitionCommitter.java:143)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
~[hamal-driver-1.13.6-v1.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
~[hamal-driver-1.13.6-v1.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
2022-05-28 00:41:18,524 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to recover the failed task
cc0206f9bd17ee99dc4565713cd749d7_0.
2022-05-28 00:41:18,525 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 50 tasks should be restarted to recover the failed task
cc0206f9bd17ee99dc4565713cd749d7_0.
2022-05-28 00:41:18,525 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job xxxxxx
(00000000000000000000000000000000) switched from state RUNNING to RESTARTING.
2022-05-28 00:41:18,526 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
(24/24) (8d5ae185e722482d8b1ff4bc3ba60e86) switched from RUNNING to CANCELING.
2022-05-28 00:41:18,526 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
(23/24) (369af96456d991046eb10cfee44df415) switched from RUNNING to CANCELING.
...
...
{noformat}
after that, the job restart and successfully restore state form cp(using
state.checkpoint-storage=jobmanager), and the following checkpoint
(27/28/29/...) can also be sucessfully finished. But it seems the recovered
state try to report commit msg of old checkpoint 26 to the PartitionCommitter
which continously cause failures.
Finally the job restart again and again, and the same error log likes
{noformat}
2022-05-28 08:36:23,718 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - PartitionCommitter
-> Sink: end (1/1) (669dfe28f49ec9b08cb1f605b7e1af86) switched from RUNNING to
FAILED on 192.168.1.226:6122-ac5e98 @ 192.168.1.226 (dataPort=41827).
java.lang.IllegalArgumentException: Checkpoint(26) has not been snapshot. The
watermark information is:
{27=1653668944610, 28=1653669762385, 29=1653669973437, 30=1653670045517,
31=1653670584329, 32=1653671198834, 33=1653671316604, 34=1653671595057,
35=1653671632382, 36=1653671940262, 37=1653672247793, 38=1653672513421,
39=1653672626251, 40=1653672872425, 41=1653673029517, 42=1653673662173,
43=1653673843265, 44=1653674382981, 45=1653674739299, 46=1653674890522,
47=1653675402372, 48=1653675767340, 49=1653676205712, 50=1653676376692,
51=1653676762574, 52=1653677105303, 53=1653677254604, 54=1653677458683,
55=1653677651603, 57=1653678458691, 58=1653678931845, 59=1653679306742,
60=1653679845020, 61=1653680406114, 62=1653680981416, 63=1653681545056,
64=1653681584696, 65=1653681622029, 66=1653682017861, 67=1653682319529,
68=1653682404672, 69=1653682559904, 70=1653682804993, 71=1653682907991,
72=1653683279780, 73=1653683905573, 74=1653684156034, 75=1653684659397,
76=1653684975030, 77=1653685329183, 78=1653685862724, 79=1653686499090,
80=1653686636903, 81=1653686780782, 82=1653687053096, 83=1653687541953,
84=1653688012617, 85=1653688337464, 86=1653688832762, 87=1653689195316,
88=1653689330027, 89=1653689545859, 90=1653689957313, 91=1653690069643,
92=1653690689424, 93=1653690963316, 94=1653691164532, 95=1653691687307,
96=1653691885408, 97=1653692235231, 98=1653692428716, 99=1653692849146,
100=1653693274253, 101=1653693438601, 102=1653694097925, 103=1653694716179,
104=1653694770858, 105=1653695305421, 106=1653695464923, 107=1653695959050,
108=1653696465917, 109=1653696825723, 110=1653696841452, 111=1653697238699,
112=1653697882510}
.
at
org.apache.flink.table.filesystem.stream.PartitionTimeCommitTrigger.committablePartitions(PartitionTimeCommitTrigger.java:122)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.table.filesystem.stream.PartitionCommitter.commitPartitions(PartitionCommitter.java:151)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.table.filesystem.stream.PartitionCommitter.processElement(PartitionCommitter.java:143)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
~[hamal-driver-1.13.6-v1.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
~[hamal-driver-1.13.6-v1.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
2022-05-28 08:36:23,718 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to recover the failed task
cc0206f9bd17ee99dc4565713cd749d7_0.
2022-05-28 08:36:23,719 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 50 tasks should be restarted to recover the failed task
cc0206f9bd17ee99dc4565713cd749d7_0.
2022-05-28 08:36:23,719 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job xxxxxx
(00000000000000000000000000000000) switched from state RUNNING to RESTARTING.
2022-05-28 08:36:23,719 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
(24/24) (66177c2069b9aeef21376d7a780ceadb) switched from RUNNING to CANCELING.
2022-05-28 08:36:23,719 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
(23/24) (7028f34c1756cd1fff5cf25dd12fd550) switched from RUNNING to CANCELING.
{noformat}
The job logic is very simple, which flink sql like
{code:java}
CREATE TEMPORAY TABLE filesystem_sink_table(....)
PARTITIONED BY(`dt`,`hour`,`topic`) WITH(
'connector'='filesystem',
'format'='textfile',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 hour',
'sink.partition-commit.policy.kind'='success-file',
'auto-compaction' = 'true'
...
);
CREATE TEMPORARY TABLE kafka_source_table ...
streamTableEnv.executeSql("INSERT INTO filesystem_sink_table SELECT ... FROM
kafka_source_table");
{code}
I have check the source at PartitionTimeCommitTrigger, and what puzzle me is
that it seems the watermarks should only remove the committed checkpointId
after pass the valiation
{code:java}
...
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException( String.format( "Checkpoint(%d) has not
been snapshot. The watermark information is: %s.", checkpointId, watermarks));
}
long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();
...
{code}
I have no idea in which scene should this exception encountered, do I mistake
some config or there some inconsistent state?
Thanks for any help.
some runtime snapshot:
!image-2022-05-28-15-35-23-708.png!
!image-2022-05-28-15-35-09-874.png!
!image-2022-05-28-15-35-02-186.png!
was:
Hi, community
When I build up a standalone job to read from kafka topic and sink to hdfs, I
found the job continously restart after normal running 4 hours.
When the first restart show up, the logs are like
{noformat}
2022-05-28 00:24:04,861 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 26 (type=CHECKPOINT) @ 1653668644856 for job
00000000000000000000000000000000.
2022-05-28 00:34:04,861 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 26 of
job 00000000000000000000000000000000 expired before completing.
2022-05-28 00:34:04,866 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 27 (type=CHECKPOINT) @ 1653669244862 for job
00000000000000000000000000000000.
2022-05-28 00:41:02,208 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
checkpoint 27 for job 00000000000000000000000000000000 (117373 bytes in 417284
ms).
2022-05-28 00:41:18,517 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - PartitionCommitter
-> Sink: end (1/1) (7e16853a4d16a80f96a3e26e17f9d677) switched from RUNNING to
FAILED on 192.168.1.142:6122-0b54e0 @ 192.168.1.142 (dataPort=43131).
java.lang.IllegalArgumentException: Checkpoint(26) has not been snapshot. The
watermark information is:
{27=1653668944610}
.
at
org.apache.flink.table.filesystem.stream.PartitionTimeCommitTrigger.committablePartitions(PartitionTimeCommitTrigger.java:122)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.table.filesystem.stream.PartitionCommitter.commitPartitions(PartitionCommitter.java:151)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.table.filesystem.stream.PartitionCommitter.processElement(PartitionCommitter.java:143)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
~[hamal-driver-1.13.6-v1.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
~[hamal-driver-1.13.6-v1.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
2022-05-28 00:41:18,524 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to recover the failed task
cc0206f9bd17ee99dc4565713cd749d7_0.
2022-05-28 00:41:18,525 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 50 tasks should be restarted to recover the failed task
cc0206f9bd17ee99dc4565713cd749d7_0.
2022-05-28 00:41:18,525 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job xxxxxx
(00000000000000000000000000000000) switched from state RUNNING to RESTARTING.
2022-05-28 00:41:18,526 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
(24/24) (8d5ae185e722482d8b1ff4bc3ba60e86) switched from RUNNING to CANCELING.
2022-05-28 00:41:18,526 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
(23/24) (369af96456d991046eb10cfee44df415) switched from RUNNING to CANCELING.
...
...
{noformat}
after that, the job restart and successfully restore state form cp(using
state.checkpoint-storage=jobmanager), and the following checkpoint
(27/28/29/...) can also be sucessfully finished. But it seems the recovered
state try to report commit msg of old checkpoint 26 to the PartitionCommitter
which continously cause failures.
Finally the job restart again and again, and the same error log likes
{noformat}
2022-05-28 08:36:23,718 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - PartitionCommitter
-> Sink: end (1/1) (669dfe28f49ec9b08cb1f605b7e1af86) switched from RUNNING to
FAILED on 192.168.1.226:6122-ac5e98 @ 192.168.1.226 (dataPort=41827).
java.lang.IllegalArgumentException: Checkpoint(26) has not been snapshot. The
watermark information is:
{27=1653668944610, 28=1653669762385, 29=1653669973437, 30=1653670045517,
31=1653670584329, 32=1653671198834, 33=1653671316604, 34=1653671595057,
35=1653671632382, 36=1653671940262, 37=1653672247793, 38=1653672513421,
39=1653672626251, 40=1653672872425, 41=1653673029517, 42=1653673662173,
43=1653673843265, 44=1653674382981, 45=1653674739299, 46=1653674890522,
47=1653675402372, 48=1653675767340, 49=1653676205712, 50=1653676376692,
51=1653676762574, 52=1653677105303, 53=1653677254604, 54=1653677458683,
55=1653677651603, 57=1653678458691, 58=1653678931845, 59=1653679306742,
60=1653679845020, 61=1653680406114, 62=1653680981416, 63=1653681545056,
64=1653681584696, 65=1653681622029, 66=1653682017861, 67=1653682319529,
68=1653682404672, 69=1653682559904, 70=1653682804993, 71=1653682907991,
72=1653683279780, 73=1653683905573, 74=1653684156034, 75=1653684659397,
76=1653684975030, 77=1653685329183, 78=1653685862724, 79=1653686499090,
80=1653686636903, 81=1653686780782, 82=1653687053096, 83=1653687541953,
84=1653688012617, 85=1653688337464, 86=1653688832762, 87=1653689195316,
88=1653689330027, 89=1653689545859, 90=1653689957313, 91=1653690069643,
92=1653690689424, 93=1653690963316, 94=1653691164532, 95=1653691687307,
96=1653691885408, 97=1653692235231, 98=1653692428716, 99=1653692849146,
100=1653693274253, 101=1653693438601, 102=1653694097925, 103=1653694716179,
104=1653694770858, 105=1653695305421, 106=1653695464923, 107=1653695959050,
108=1653696465917, 109=1653696825723, 110=1653696841452, 111=1653697238699,
112=1653697882510}
.
at
org.apache.flink.table.filesystem.stream.PartitionTimeCommitTrigger.committablePartitions(PartitionTimeCommitTrigger.java:122)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.table.filesystem.stream.PartitionCommitter.commitPartitions(PartitionCommitter.java:151)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.table.filesystem.stream.PartitionCommitter.processElement(PartitionCommitter.java:143)
~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
~[hamal-driver-1.13.6-v1.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
~[hamal-driver-1.13.6-v1.jar:?]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
2022-05-28 08:36:23,718 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to recover the failed task
cc0206f9bd17ee99dc4565713cd749d7_0.
2022-05-28 08:36:23,719 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 50 tasks should be restarted to recover the failed task
cc0206f9bd17ee99dc4565713cd749d7_0.
2022-05-28 08:36:23,719 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job xxxxxx
(00000000000000000000000000000000) switched from state RUNNING to RESTARTING.
2022-05-28 08:36:23,719 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
(24/24) (66177c2069b9aeef21376d7a780ceadb) switched from RUNNING to CANCELING.
2022-05-28 08:36:23,719 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
(23/24) (7028f34c1756cd1fff5cf25dd12fd550) switched from RUNNING to CANCELING.
{noformat}
The job logic is very simple, which flink sql like
{code:java}
CREATE TEMPORAY TABLE filesystem_sink_table(....)
PARTITIONED BY(`dt`,`hour`,`topic`) WITH(
'connector'='filesystem',
'format'='textfile',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 hour',
'sink.partition-commit.policy.kind'='success-file',
'auto-compaction' = 'true'
...
);
CREATE TEMPORARY TABLE kafka_source_table ...
streamTableEnv.executeSql("INSERT INTO filesystem_sink_table SELECT ... FROM
kafka_source_table");
{code}
I have check the source at PartitionTimeCommitTrigger, and what puzzle me is
that it seems the watermarks should only remove the committed checkpointId
after pass the valiation
{code:java}
...
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException( String.format( "Checkpoint(%d) has not
been snapshot. The watermark information is: %s.", checkpointId, watermarks));
}
long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();
...
{code}
I have no idea in which scene should this exception encountered, do I mistake
some config or there some inconsistent state?
Thanks for any help.
!image-2022-05-28-15-35-23-708.png!
!image-2022-05-28-15-35-09-874.png!
!image-2022-05-28-15-35-02-186.png!
> Standalone Job continously restart by illegal checkpointId check on
> PartitionTimeCommitTrigger when use FilesystemTableSink
> ----------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-27823
> URL: https://issues.apache.org/jira/browse/FLINK-27823
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.13.6
> Reporter: suheng.cloud
> Priority: Major
> Attachments: image-2022-05-28-15-33-04-632.png,
> image-2022-05-28-15-34-23-397.png, image-2022-05-28-15-34-46-698.png,
> image-2022-05-28-15-35-02-186.png, image-2022-05-28-15-35-09-874.png,
> image-2022-05-28-15-35-23-708.png
>
>
> Hi, community
> When I build up a standalone job to read from kafka topic and sink to hdfs, I
> found the job continously restart after normal running 4 hours.
> When the first restart show up, the logs are like
>
> {noformat}
> 2022-05-28 00:24:04,861 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
> checkpoint 26 (type=CHECKPOINT) @ 1653668644856 for job
> 00000000000000000000000000000000.
> 2022-05-28 00:34:04,861 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 26
> of job 00000000000000000000000000000000 expired before completing.
> 2022-05-28 00:34:04,866 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
> checkpoint 27 (type=CHECKPOINT) @ 1653669244862 for job
> 00000000000000000000000000000000.
> 2022-05-28 00:41:02,208 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
> checkpoint 27 for job 00000000000000000000000000000000 (117373 bytes in
> 417284 ms).
> 2022-05-28 00:41:18,517 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
> PartitionCommitter -> Sink: end (1/1) (7e16853a4d16a80f96a3e26e17f9d677)
> switched from RUNNING to FAILED on 192.168.1.142:6122-0b54e0 @ 192.168.1.142
> (dataPort=43131).
> java.lang.IllegalArgumentException: Checkpoint(26) has not been snapshot. The
> watermark information is:
> {27=1653668944610}
> .
> at
> org.apache.flink.table.filesystem.stream.PartitionTimeCommitTrigger.committablePartitions(PartitionTimeCommitTrigger.java:122)
> ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.filesystem.stream.PartitionCommitter.commitPartitions(PartitionCommitter.java:151)
> ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.filesystem.stream.PartitionCommitter.processElement(PartitionCommitter.java:143)
> ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> ~[hamal-driver-1.13.6-v1.jar:?]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> ~[hamal-driver-1.13.6-v1.jar:?]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
> 2022-05-28 00:41:18,524 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
> [] - Calculating tasks to restart to recover the failed task
> cc0206f9bd17ee99dc4565713cd749d7_0.
> 2022-05-28 00:41:18,525 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
> [] - 50 tasks should be restarted to recover the failed task
> cc0206f9bd17ee99dc4565713cd749d7_0.
> 2022-05-28 00:41:18,525 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job xxxxxx
> (00000000000000000000000000000000) switched from state RUNNING to RESTARTING.
> 2022-05-28 00:41:18,526 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
> (24/24) (8d5ae185e722482d8b1ff4bc3ba60e86) switched from RUNNING to CANCELING.
> 2022-05-28 00:41:18,526 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
> (23/24) (369af96456d991046eb10cfee44df415) switched from RUNNING to CANCELING.
> ...
> ...
> {noformat}
>
> after that, the job restart and successfully restore state form cp(using
> state.checkpoint-storage=jobmanager), and the following checkpoint
> (27/28/29/...) can also be sucessfully finished. But it seems the recovered
> state try to report commit msg of old checkpoint 26 to the PartitionCommitter
> which continously cause failures.
> Finally the job restart again and again, and the same error log likes
>
> {noformat}
> 2022-05-28 08:36:23,718 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
> PartitionCommitter -> Sink: end (1/1) (669dfe28f49ec9b08cb1f605b7e1af86)
> switched from RUNNING to FAILED on 192.168.1.226:6122-ac5e98 @ 192.168.1.226
> (dataPort=41827).
> java.lang.IllegalArgumentException: Checkpoint(26) has not been snapshot. The
> watermark information is:
> {27=1653668944610, 28=1653669762385, 29=1653669973437, 30=1653670045517,
> 31=1653670584329, 32=1653671198834, 33=1653671316604, 34=1653671595057,
> 35=1653671632382, 36=1653671940262, 37=1653672247793, 38=1653672513421,
> 39=1653672626251, 40=1653672872425, 41=1653673029517, 42=1653673662173,
> 43=1653673843265, 44=1653674382981, 45=1653674739299, 46=1653674890522,
> 47=1653675402372, 48=1653675767340, 49=1653676205712, 50=1653676376692,
> 51=1653676762574, 52=1653677105303, 53=1653677254604, 54=1653677458683,
> 55=1653677651603, 57=1653678458691, 58=1653678931845, 59=1653679306742,
> 60=1653679845020, 61=1653680406114, 62=1653680981416, 63=1653681545056,
> 64=1653681584696, 65=1653681622029, 66=1653682017861, 67=1653682319529,
> 68=1653682404672, 69=1653682559904, 70=1653682804993, 71=1653682907991,
> 72=1653683279780, 73=1653683905573, 74=1653684156034, 75=1653684659397,
> 76=1653684975030, 77=1653685329183, 78=1653685862724, 79=1653686499090,
> 80=1653686636903, 81=1653686780782, 82=1653687053096, 83=1653687541953,
> 84=1653688012617, 85=1653688337464, 86=1653688832762, 87=1653689195316,
> 88=1653689330027, 89=1653689545859, 90=1653689957313, 91=1653690069643,
> 92=1653690689424, 93=1653690963316, 94=1653691164532, 95=1653691687307,
> 96=1653691885408, 97=1653692235231, 98=1653692428716, 99=1653692849146,
> 100=1653693274253, 101=1653693438601, 102=1653694097925, 103=1653694716179,
> 104=1653694770858, 105=1653695305421, 106=1653695464923, 107=1653695959050,
> 108=1653696465917, 109=1653696825723, 110=1653696841452, 111=1653697238699,
> 112=1653697882510}
> .
> at
> org.apache.flink.table.filesystem.stream.PartitionTimeCommitTrigger.committablePartitions(PartitionTimeCommitTrigger.java:122)
> ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.filesystem.stream.PartitionCommitter.commitPartitions(PartitionCommitter.java:151)
> ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.filesystem.stream.PartitionCommitter.processElement(PartitionCommitter.java:143)
> ~[flink-table-blink_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
> ~[flink-dist_2.12-1.13.6.jar:1.13.6]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> ~[hamal-driver-1.13.6-v1.jar:?]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> ~[hamal-driver-1.13.6-v1.jar:?]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_221]
> 2022-05-28 08:36:23,718 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
> [] - Calculating tasks to restart to recover the failed task
> cc0206f9bd17ee99dc4565713cd749d7_0.
> 2022-05-28 08:36:23,719 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
> [] - 50 tasks should be restarted to recover the failed task
> cc0206f9bd17ee99dc4565713cd749d7_0.
> 2022-05-28 08:36:23,719 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job xxxxxx
> (00000000000000000000000000000000) switched from state RUNNING to RESTARTING.
> 2022-05-28 08:36:23,719 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
> (24/24) (66177c2069b9aeef21376d7a780ceadb) switched from RUNNING to CANCELING.
> 2022-05-28 08:36:23,719 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - compact-operator
> (23/24) (7028f34c1756cd1fff5cf25dd12fd550) switched from RUNNING to CANCELING.
> {noformat}
> The job logic is very simple, which flink sql like
> {code:java}
> CREATE TEMPORAY TABLE filesystem_sink_table(....)
> PARTITIONED BY(`dt`,`hour`,`topic`) WITH(
> 'connector'='filesystem',
> 'format'='textfile',
> 'sink.partition-commit.trigger'='partition-time',
> 'sink.partition-commit.delay'='1 hour',
> 'sink.partition-commit.policy.kind'='success-file',
> 'auto-compaction' = 'true'
> ...
> );
> CREATE TEMPORARY TABLE kafka_source_table ...
> streamTableEnv.executeSql("INSERT INTO filesystem_sink_table SELECT ... FROM
> kafka_source_table");
> {code}
>
>
> I have check the source at PartitionTimeCommitTrigger, and what puzzle me is
> that it seems the watermarks should only remove the committed checkpointId
> after pass the valiation
>
> {code:java}
> ...
> if (!watermarks.containsKey(checkpointId)) {
> throw new IllegalArgumentException( String.format( "Checkpoint(%d) has not
> been snapshot. The watermark information is: %s.", checkpointId,
> watermarks));
> }
> long watermark = watermarks.get(checkpointId);
> watermarks.headMap(checkpointId, true).clear();
> ...
> {code}
> I have no idea in which scene should this exception encountered, do I mistake
> some config or there some inconsistent state?
> Thanks for any help.
>
>
> some runtime snapshot:
> !image-2022-05-28-15-35-23-708.png!
> !image-2022-05-28-15-35-09-874.png!
> !image-2022-05-28-15-35-02-186.png!
--
This message was sent by Atlassian Jira
(v8.20.7#820007)