[ 
https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401721#comment-16401721
 ] 

Grzegorz Kołakowski edited comment on BEAM-2393 at 3/16/18 12:34 PM:
---------------------------------------------------------------------

The times are very low, usually <= 30ms, sometimes around ~150ms.

I've manually build Flink 1.4.0 with one additional log in 
{{org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint}}:
{code:java}
private boolean performCheckpoint(
      CheckpointMetaData checkpointMetaData,
      CheckpointOptions checkpointOptions,
      CheckpointMetrics checkpointMetrics) throws Exception {

   LOG.debug("Starting checkpoint ({}) {} on task {}",
      checkpointMetaData.getCheckpointId(), 
checkpointOptions.getCheckpointType(), getName());

   synchronized (lock) {
      LOG.debug("Lock acquired {}. task {}", lock, getName()); // new line{code}
The corresponding logs are as follows:
{noformat}
2018-03-16 09:48:36,962 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (1) 
SAVEPOINT on task Source: PTransformTranslation.UnknownRawPTransform -> 
ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,953 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Lock acquired. task 
Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo 
-> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,989 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous 
checkpoints for checkpoint 1 on task Source: 
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> 
Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,990 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Source: 
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> 
Window.Into()/Window.Assign.out (1/1) - finished synchronous part of checkpoint 
1.Alignment duration: 0 ms, snapshot duration 33 ms
2018-03-16 09:48:52,012 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Source: 
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> 
Window.Into()/Window.Assign.out (1/1) - finished asynchronous part of 
checkpoint 1. Asynchronous duration: 22 ms{noformat}
In my last try, the entire checkpoint took ~16s. Please note that the thread 
waited on the lock for 15s.
----
Let me explain briefly how I'm testing it. I have a text file which I process 
line by line. On each line I apply ParDo, which contains a short sleep 
{{Thread.sleep(20), }}since I'm testing it manually - I run {{bin/flink 
savepoint <jobId> <path>}}. I added the sleep to be able to run the command 
before job finishes.

The thread which is responsible for making the checkpoint waited 15s. According 
to my logs, the processing thread acquired the {{lock}} 738 times while the 
checkpointing thread was waiting for it.

Without the sleep, the savepoint took ~280ms.


was (Author: grzegorz_kolakowski):
The times are very low, usually <= 30ms, sometimes around ~150ms.

I've manually build Flink 1.4.0 with one additional log in 
{{org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint}}:
{code:java}
private boolean performCheckpoint(
      CheckpointMetaData checkpointMetaData,
      CheckpointOptions checkpointOptions,
      CheckpointMetrics checkpointMetrics) throws Exception {

   LOG.debug("Starting checkpoint ({}) {} on task {}",
      checkpointMetaData.getCheckpointId(), 
checkpointOptions.getCheckpointType(), getName());

   synchronized (lock) {
      LOG.debug("Lock acquired {}. task {}", lock, getName()); // new line{code}
The corresponding logs are as follows:
{noformat}
2018-03-16 09:48:36,962 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (1) 
SAVEPOINT on task Source: PTransformTranslation.UnknownRawPTransform -> 
ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,953 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Lock acquired. task 
Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo 
-> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,989 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous 
checkpoints for checkpoint 1 on task Source: 
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> 
Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,990 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Source: 
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> 
Window.Into()/Window.Assign.out (1/1) - finished synchronous part of checkpoint 
1.Alignment duration: 0 ms, snapshot duration 33 ms
2018-03-16 09:48:52,012 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask - Source: 
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> 
Window.Into()/Window.Assign.out (1/1) - finished asynchronous part of 
checkpoint 1. Asynchronous duration: 22 ms{noformat}
In my last try, the entire checkpoint took ~16s. Please note that the thread 
waited on the lock for 15s.

> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -----------------------------------------------------------------
>
>                 Key: BEAM-2393
>                 URL: https://issues.apache.org/jira/browse/BEAM-2393
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Jingsong Lee
>            Assignee: Grzegorz Kołakowski
>            Priority: Major
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when 
> the failure to restart, it will send duplicate data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to