[ 
https://issues.apache.org/jira/browse/FLINK-32911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760333#comment-17760333
 ] 

Matthias Pohl commented on FLINK-32911:
---------------------------------------

Thanks for fixing the issue. I reran the test on 
[6e4c2d1|https://github.com/apache/flink/commit/6e4c2d105a17322e1655d5760dd5967de1c866a5]
 (and used Flink 1.17.1 as a baseline). I ran 
[TopSpeedWindowing|https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java]
 with a slightly different configuration to enable batch processing on both 
checkouts:
{code:diff}
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index 3cca4cc8f8f..41204acfbcc 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
@@ -24,6 +25,8 @@ import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.util.ratelimit.GuavaRateLimiter;
 import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.connector.datagen.source.DataGeneratorSource;
 import org.apache.flink.connector.file.sink.FileSink;
@@ -61,7 +64,13 @@ public class TopSpeedWindowing {
 
         // Create the execution environment. This is the main entrypoint
         // to building a Flink application.
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final Configuration config = new Configuration();
+        config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
"file:///tmp/checkpoints");
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.enableCheckpointing(20000);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
 
         // Apache Flink’s unified approach to stream and batch processing 
means that a DataStream
         // application executed over bounded input will produce the same final 
results regardless
{code}

The job ran on the input [attached (uncompressed!) input 
data|https://issues.apache.org/jira/secure/attachment/13062368/data.txt.gz]. I 
attached the logs of both runs to this Jira issue 
([flink-1.17.1.log|https://issues.apache.org/jira/secure/attachment/13062596/flink-1.17.1.log]
 and 
[flink-1.18-6e4c2d1-SNAPSHOT.log|https://issues.apache.org/jira/secure/attachment/13062597/flink-1.18-6e4c2d1-SNAPSHOT.log]).

{code:title=flink-1.17.1.log}
[...]
2023-08-30 13:14:14,952 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader 
received NoMoreSplits event.
2023-08-30 13:14:33,525 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 1 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1693394073515 for job 
35b1393706f05b74ff74355e4d080c78.
2023-08-30 13:14:33,695 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
checkpoint 1 for job 35b1393706f05b74ff74355e4d080c78 (30002681 bytes, 
checkpointDuration=177 ms, finalizationTime=3 ms).
2023-08-30 13:14:33,697 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
checkpoint 1 as completed for source Source: file-input.
2023-08-30 13:14:33,698 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing 
Source Reader.
[...]
{code}

{code:title=flink-1.18-6e4c2d1-SNAPSHOT.log}
[...]
2023-08-30 13:14:12,559 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader 
received NoMoreSplits event.
2023-08-30 13:14:12,563 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Triggering a manual checkpoint for job 
02520a50b9464b94d7fa6908929fef45.
2023-08-30 13:14:12,571 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 1 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1693394052565 for job 
02520a50b9464b94d7fa6908929fef45.
2023-08-30 13:14:12,905 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
checkpoint 1 for job 02520a50b9464b94d7fa6908929fef45 (30002705 bytes, 
checkpointDuration=337 ms, finalizationTime=3 ms).
2023-08-30 13:14:12,907 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
checkpoint 1 as completed for source Source: file-input.
2023-08-30 13:14:12,909 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing 
Source Reader.
[...]
{code}

The logs show the difference in runtime (1.17.1 has a ~18s gap between end of 
data and checkpoint triggering; 1.18 triggers a checkpoint right away with an 
appropriate INFO log message).

I consider this issue to be resolved and tested. [~Jiang Xin] feel free to 
resolve this issue or ping me if you need me to do additional testing.

> Release Testing: Verify FLINK-28386: Trigger an immediate checkpoint after 
> all sinks finished
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32911
>                 URL: https://issues.apache.org/jira/browse/FLINK-32911
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Tests
>    Affects Versions: 1.18.0
>            Reporter: Jiang Xin
>            Assignee: Matthias Pohl
>            Priority: Major
>             Fix For: 1.18.0
>
>         Attachments: data.txt.gz, flink-1.17.1.log, 
> flink-1.18-6e4c2d1-SNAPSHOT.log
>
>
> The feature is described in the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#waiting-for-the-final-checkpoint-before-task-exit]
>  .
> To test the feature, we should run a Flink program on a bounded source and 
> configure it with a large checkpoint interval. With this feature, the program 
> should end up immediately when no more records need to be processed, instead 
> of waiting for one more periodic checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to