[ 
https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401721#comment-16401721
 ] 

Grzegorz Kołakowski edited comment on BEAM-2393 at 3/16/18 10:36 AM:
---------------------------------------------------------------------

The times are very low, usually <= 30ms, sometimes around ~150ms.

I've manually build Flink 1.4.0 with one additional log in 
{{org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint}}:
{code:java}
private boolean performCheckpoint(
      CheckpointMetaData checkpointMetaData,
      CheckpointOptions checkpointOptions,
      CheckpointMetrics checkpointMetrics) throws Exception {

   LOG.debug("Starting checkpoint ({}) {} on task {}",
      checkpointMetaData.getCheckpointId(), 
checkpointOptions.getCheckpointType(), getName());

   synchronized (lock) {
      LOG.debug("Lock acquired {}. task {}", lock, getName()); // new line{code}
The corresponding logs are as follows:
{noformat}
2018-03-16 09:48:36,962 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (1) 
SAVEPOINT on task Source: PTransformTranslation.UnknownRawPTransform -> 
ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,953 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Lock acquired. task 
Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo 
-> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,989 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous 
checkpoints for checkpoint 1 on task Source: 
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> 
Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,990 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Source: 
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> 
Window.Into()/Window.Assign.out (1/1) - finished synchronous part of checkpoint 
1.Alignment duration: 0 ms, snapshot duration 33 ms
2018-03-16 09:48:52,012 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Source: 
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> 
Window.Into()/Window.Assign.out (1/1) - finished asynchronous part of 
checkpoint 1. Asynchronous duration: 22 ms{noformat}
In my last try, the entire checkpoint took ~16s. Please note that the thread 
waited on the lock for 15s.


was (Author: grzegorz_kolakowski):
The times are very low, usually <= 30ms, sometimes around ~150ms.

I've manually build Flink 1.4.0 with one additional log in 
{{org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint}}:
{code:java}
private boolean performCheckpoint(
      CheckpointMetaData checkpointMetaData,
      CheckpointOptions checkpointOptions,
      CheckpointMetrics checkpointMetrics) throws Exception {

   LOG.debug("Starting checkpoint ({}) {} on task {}",
      checkpointMetaData.getCheckpointId(), 
checkpointOptions.getCheckpointType(), getName());

   synchronized (lock) {
      LOG.debug("Lock acquired {}. task {}", lock, getName()); // new line{code}
 

The corresponding logs are as follows:

 
{noformat}
2018-03-16 09:48:36,962 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (1) 
SAVEPOINT on task Source: PTransformTranslation.UnknownRawPTransform -> 
ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,953 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Lock acquired. task 
Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo 
-> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,989 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous 
checkpoints for checkpoint 1 on task Source: 
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> 
Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,990 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Source: 
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> 
Window.Into()/Window.Assign.out (1/1) - finished synchronous part of checkpoint 
1.Alignment duration: 0 ms, snapshot duration 33 ms
2018-03-16 09:48:52,012 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Source: 
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> 
Window.Into()/Window.Assign.out (1/1) - finished asynchronous part of 
checkpoint 1. Asynchronous duration: 22 ms{noformat}
 

In my last try, the entire checkpoint took ~16s. Please note that the thread 
waited on the lock for 15s.

> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -----------------------------------------------------------------
>
>                 Key: BEAM-2393
>                 URL: https://issues.apache.org/jira/browse/BEAM-2393
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Jingsong Lee
>            Assignee: Grzegorz Kołakowski
>            Priority: Major
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when 
> the failure to restart, it will send duplicate data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to