kristoffSC commented on code in PR #21052:
URL: https://github.com/apache/flink/pull/21052#discussion_r997505723


##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java:
##########
@@ -113,6 +122,69 @@ public void init() {
         GLOBAL_COMMIT_QUEUE.clear();
     }
 
+    /**
+     * This test executes Sink operator with committer and global committer. 
The global committer
+     * throws exception on 3rd checkpoint (commitNumberToFailOn == 2). The 
local mini cluster is
+     * executing the recovery, we should expect no data loss. In this 
particular setup unique number
+     * of rows persisted by committer should be same as unique number of rows 
persisted by
+     * GlobalCommitter.
+     */
+    @Test
+    public void testGlobalCommitterNotMissingRecordsDuringRecovery() throws 
Exception {
+
+        final StreamExecutionEnvironment env = buildStreamEnv();
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
+        env.enableCheckpointing(5000L);

Review Comment:
   yes, I've changed it to 1s, with `numberOfCheckpoints` set 5 it give us min 
5s duration I Guess. 
   
   I dont want to tight up the checkpoint interval here, since source has to 
send fixed number of records within one checkpoint. I don't want to push things 
to the edge here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to