[ https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401721#comment-16401721 ]
Grzegorz Kołakowski edited comment on BEAM-2393 at 3/16/18 10:36 AM: --------------------------------------------------------------------- The times are very low, usually <= 30ms, sometimes around ~150ms. I've manually build Flink 1.4.0 with one additional log in {{org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint}}: {code:java} private boolean performCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { LOG.debug("Starting checkpoint ({}) {} on task {}", checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName()); synchronized (lock) { LOG.debug("Lock acquired {}. task {}", lock, getName()); // new line{code} The corresponding logs are as follows: {noformat} 2018-03-16 09:48:36,962 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (1) SAVEPOINT on task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) 2018-03-16 09:48:51,953 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Lock acquired. task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) 2018-03-16 09:48:51,989 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous checkpoints for checkpoint 1 on task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) 2018-03-16 09:48:51,990 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) - finished synchronous part of checkpoint 1.Alignment duration: 0 ms, snapshot duration 33 ms 2018-03-16 09:48:52,012 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) - finished asynchronous part of checkpoint 1. Asynchronous duration: 22 ms{noformat} In my last try, the entire checkpoint took ~16s. Please note that the thread waited on the lock for 15s. was (Author: grzegorz_kolakowski): The times are very low, usually <= 30ms, sometimes around ~150ms. I've manually build Flink 1.4.0 with one additional log in {{org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint}}: {code:java} private boolean performCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { LOG.debug("Starting checkpoint ({}) {} on task {}", checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName()); synchronized (lock) { LOG.debug("Lock acquired {}. task {}", lock, getName()); // new line{code} The corresponding logs are as follows: {noformat} 2018-03-16 09:48:36,962 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (1) SAVEPOINT on task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) 2018-03-16 09:48:51,953 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Lock acquired. task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) 2018-03-16 09:48:51,989 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous checkpoints for checkpoint 1 on task Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) 2018-03-16 09:48:51,990 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) - finished synchronous part of checkpoint 1.Alignment duration: 0 ms, snapshot duration 33 ms 2018-03-16 09:48:52,012 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1) - finished asynchronous part of checkpoint 1. Asynchronous duration: 22 ms{noformat} In my last try, the entire checkpoint took ~16s. Please note that the thread waited on the lock for 15s. > BoundedSource is not fault-tolerant in FlinkRunner Streaming mode > ----------------------------------------------------------------- > > Key: BEAM-2393 > URL: https://issues.apache.org/jira/browse/BEAM-2393 > Project: Beam > Issue Type: Bug > Components: runner-flink > Reporter: Jingsong Lee > Assignee: Grzegorz Kołakowski > Priority: Major > > {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when > the failure to restart, it will send duplicate data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)