[
https://issues.apache.org/jira/browse/FLINK-32911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760333#comment-17760333
]
Matthias Pohl commented on FLINK-32911:
---------------------------------------
Thanks for fixing the issue. I reran the test on
[6e4c2d1|https://github.com/apache/flink/commit/6e4c2d105a17322e1655d5760dd5967de1c866a5]
(and used Flink 1.17.1 as a baseline). I ran
[TopSpeedWindowing|https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java]
with a slightly different configuration to enable batch processing on both
checkouts:
{code:diff}
diff --git
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index 3cca4cc8f8f..41204acfbcc 100644
---
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.examples.windowing;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
@@ -24,6 +25,8 @@ import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.GuavaRateLimiter;
import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.file.sink.FileSink;
@@ -61,7 +64,13 @@ public class TopSpeedWindowing {
// Create the execution environment. This is the main entrypoint
// to building a Flink application.
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ final Configuration config = new Configuration();
+ config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+ config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
"file:///tmp/checkpoints");
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.enableCheckpointing(20000);
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// Apache Flinkās unified approach to stream and batch processing
means that a DataStream
// application executed over bounded input will produce the same final
results regardless
{code}
The job ran on the input [attached (uncompressed!) input
data|https://issues.apache.org/jira/secure/attachment/13062368/data.txt.gz]. I
attached the logs of both runs to this Jira issue
([flink-1.17.1.log|https://issues.apache.org/jira/secure/attachment/13062596/flink-1.17.1.log]
and
[flink-1.18-6e4c2d1-SNAPSHOT.log|https://issues.apache.org/jira/secure/attachment/13062597/flink-1.18-6e4c2d1-SNAPSHOT.log]).
{code:title=flink-1.17.1.log}
[...]
2023-08-30 13:14:14,952 INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader
received NoMoreSplits event.
2023-08-30 13:14:33,525 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 1 (type=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1693394073515 for job
35b1393706f05b74ff74355e4d080c78.
2023-08-30 13:14:33,695 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
checkpoint 1 for job 35b1393706f05b74ff74355e4d080c78 (30002681 bytes,
checkpointDuration=177 ms, finalizationTime=3 ms).
2023-08-30 13:14:33,697 INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking
checkpoint 1 as completed for source Source: file-input.
2023-08-30 13:14:33,698 INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing
Source Reader.
[...]
{code}
{code:title=flink-1.18-6e4c2d1-SNAPSHOT.log}
[...]
2023-08-30 13:14:12,559 INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader
received NoMoreSplits event.
2023-08-30 13:14:12,563 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Triggering a manual checkpoint for job
02520a50b9464b94d7fa6908929fef45.
2023-08-30 13:14:12,571 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 1 (type=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1693394052565 for job
02520a50b9464b94d7fa6908929fef45.
2023-08-30 13:14:12,905 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
checkpoint 1 for job 02520a50b9464b94d7fa6908929fef45 (30002705 bytes,
checkpointDuration=337 ms, finalizationTime=3 ms).
2023-08-30 13:14:12,907 INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking
checkpoint 1 as completed for source Source: file-input.
2023-08-30 13:14:12,909 INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing
Source Reader.
[...]
{code}
The logs show the difference in runtime (1.17.1 has a ~18s gap between end of
data and checkpoint triggering; 1.18 triggers a checkpoint right away with an
appropriate INFO log message).
I consider this issue to be resolved and tested. [~Jiang Xin] feel free to
resolve this issue or ping me if you need me to do additional testing.
> Release Testing: Verify FLINK-28386: Trigger an immediate checkpoint after
> all sinks finished
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-32911
> URL: https://issues.apache.org/jira/browse/FLINK-32911
> Project: Flink
> Issue Type: Sub-task
> Components: Tests
> Affects Versions: 1.18.0
> Reporter: Jiang Xin
> Assignee: Matthias Pohl
> Priority: Major
> Fix For: 1.18.0
>
> Attachments: data.txt.gz, flink-1.17.1.log,
> flink-1.18-6e4c2d1-SNAPSHOT.log
>
>
> The feature is described in the
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#waiting-for-the-final-checkpoint-before-task-exit]
> .
> To test the feature, we should run a Flink program on a bounded source and
> configure it with a large checkpoint interval. With this feature, the program
> should end up immediately when no more records need to be processed, instead
> of waiting for one more periodic checkpoint.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)