[
https://issues.apache.org/jira/browse/BEAM-2393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401721#comment-16401721
]
Grzegorz Kołakowski edited comment on BEAM-2393 at 3/16/18 12:34 PM:
---------------------------------------------------------------------
The times are very low, usually <= 30ms, sometimes around ~150ms.
I've manually build Flink 1.4.0 with one additional log in
{{org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint}}:
{code:java}
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointMetaData.getCheckpointId(),
checkpointOptions.getCheckpointType(), getName());
synchronized (lock) {
LOG.debug("Lock acquired {}. task {}", lock, getName()); // new line{code}
The corresponding logs are as follows:
{noformat}
2018-03-16 09:48:36,962 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (1)
SAVEPOINT on task Source: PTransformTranslation.UnknownRawPTransform ->
ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,953 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask - Lock acquired. task
Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo
-> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,989 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous
checkpoints for checkpoint 1 on task Source:
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo ->
Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,990 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask - Source:
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo ->
Window.Into()/Window.Assign.out (1/1) - finished synchronous part of checkpoint
1.Alignment duration: 0 ms, snapshot duration 33 ms
2018-03-16 09:48:52,012 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask - Source:
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo ->
Window.Into()/Window.Assign.out (1/1) - finished asynchronous part of
checkpoint 1. Asynchronous duration: 22 ms{noformat}
In my last try, the entire checkpoint took ~16s. Please note that the thread
waited on the lock for 15s.
----
Let me explain briefly how I'm testing it. I have a text file which I process
line by line. On each line I apply ParDo, which contains a short sleep
{{Thread.sleep(20), }}since I'm testing it manually - I run {{bin/flink
savepoint <jobId> <path>}}. I added the sleep to be able to run the command
before job finishes.
The thread which is responsible for making the checkpoint waited 15s. According
to my logs, the processing thread acquired the {{lock}} 738 times while the
checkpointing thread was waiting for it.
Without the sleep, the savepoint took ~280ms.
was (Author: grzegorz_kolakowski):
The times are very low, usually <= 30ms, sometimes around ~150ms.
I've manually build Flink 1.4.0 with one additional log in
{{org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint}}:
{code:java}
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointMetaData.getCheckpointId(),
checkpointOptions.getCheckpointType(), getName());
synchronized (lock) {
LOG.debug("Lock acquired {}. task {}", lock, getName()); // new line{code}
The corresponding logs are as follows:
{noformat}
2018-03-16 09:48:36,962 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (1)
SAVEPOINT on task Source: PTransformTranslation.UnknownRawPTransform ->
ParDoTranslation.RawParDo -> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,953 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask - Lock acquired. task
Source: PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo
-> Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,989 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous
checkpoints for checkpoint 1 on task Source:
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo ->
Window.Into()/Window.Assign.out (1/1)
2018-03-16 09:48:51,990 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask - Source:
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo ->
Window.Into()/Window.Assign.out (1/1) - finished synchronous part of checkpoint
1.Alignment duration: 0 ms, snapshot duration 33 ms
2018-03-16 09:48:52,012 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask - Source:
PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo ->
Window.Into()/Window.Assign.out (1/1) - finished asynchronous part of
checkpoint 1. Asynchronous duration: 22 ms{noformat}
In my last try, the entire checkpoint took ~16s. Please note that the thread
waited on the lock for 15s.
> BoundedSource is not fault-tolerant in FlinkRunner Streaming mode
> -----------------------------------------------------------------
>
> Key: BEAM-2393
> URL: https://issues.apache.org/jira/browse/BEAM-2393
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Jingsong Lee
> Assignee: Grzegorz Kołakowski
> Priority: Major
>
> {{BoundedSourceWrapper}} does not implement snapshot() and restore(), when
> the failure to restart, it will send duplicate data.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)