gaoyunhaii commented on a change in pull request #17131:
URL: https://github.com/apache/flink/pull/17131#discussion_r702647767



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
##########
@@ -932,6 +932,53 @@ public void 
testTimeoutAlignmentAfterReceivedEndOfPartition() throws Exception {
         }
     }
 
+    /**
+     * This test verifies a special case that the checkpoint handler starts 
the new checkpoint via
+     * received barrier announcement from the first channel, then {@link 
EndOfPartitionEvent} from
+     * the second channel and then the barrier from the first channel. In this 
case we should
+     * ensures the {@link 
SingleCheckpointBarrierHandler#markAlignmentStart(long, long)} should be
+     * called. More information is available in 
https://issues.apache.org/jira/browse/FLINK-24068.
+     */
+    @Test
+    public void testStartNewCheckpointViaAnnouncement() throws Exception {
+        int numChannels = 3;
+        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
+        long alignmentTimeOut = 10000L;
+
+        try (CheckpointedInputGate gate =
+                new TestCheckpointedInputGateBuilder(3, 
getTestBarrierHandlerFactory(target))
+                        .withRemoteChannels()
+                        .withMailboxExecutor()
+                        .build()) {
+            getChannel(gate, 0)
+                    .onBuffer(
+                            barrier(
+                                    1,
+                                    clock.relativeTimeMillis(),
+                                    alignedWithTimeout(getDefault(), 
alignmentTimeOut)),
+                            0,
+                            0);
+            getChannel(gate, 1).onBuffer(endOfPartition(), 0, 0);
+
+            assertAnnouncement(gate);
+            assertEvent(gate, EndOfPartitionEvent.class);
+            assertBarrier(gate);
+
+            getChannel(gate, 2)
+                    .onBuffer(
+                            barrier(
+                                    1,
+                                    clock.relativeTimeMillis(),
+                                    alignedWithTimeout(getDefault(), 
alignmentTimeOut)),
+                            0,
+                            0);
+            assertAnnouncement(gate);
+            assertBarrier(gate);
+
+            assertThat(target.triggeredCheckpoints, contains(1L));

Review comment:
       The test would throw the exception during `markAlignmentEnd` since the 
duration would be a negative number due to overflow. I enhanced the test to add 
the explicit check that markAlignmentStart is called after received the first 
`EndOfPartitionEvent`.  

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
##########
@@ -932,6 +932,53 @@ public void 
testTimeoutAlignmentAfterReceivedEndOfPartition() throws Exception {
         }
     }
 
+    /**
+     * This test verifies a special case that the checkpoint handler starts 
the new checkpoint via
+     * received barrier announcement from the first channel, then {@link 
EndOfPartitionEvent} from
+     * the second channel and then the barrier from the first channel. In this 
case we should
+     * ensures the {@link 
SingleCheckpointBarrierHandler#markAlignmentStart(long, long)} should be
+     * called. More information is available in 
https://issues.apache.org/jira/browse/FLINK-24068.
+     */
+    @Test
+    public void testStartNewCheckpointViaAnnouncement() throws Exception {
+        int numChannels = 3;
+        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
+        long alignmentTimeOut = 10000L;
+
+        try (CheckpointedInputGate gate =
+                new TestCheckpointedInputGateBuilder(3, 
getTestBarrierHandlerFactory(target))
+                        .withRemoteChannels()
+                        .withMailboxExecutor()
+                        .build()) {
+            getChannel(gate, 0)
+                    .onBuffer(
+                            barrier(
+                                    1,
+                                    clock.relativeTimeMillis(),
+                                    alignedWithTimeout(getDefault(), 
alignmentTimeOut)),
+                            0,
+                            0);
+            getChannel(gate, 1).onBuffer(endOfPartition(), 0, 0);
+
+            assertAnnouncement(gate);
+            assertEvent(gate, EndOfPartitionEvent.class);
+            assertBarrier(gate);
+
+            getChannel(gate, 2)
+                    .onBuffer(
+                            barrier(
+                                    1,
+                                    clock.relativeTimeMillis(),
+                                    alignedWithTimeout(getDefault(), 
alignmentTimeOut)),
+                            0,
+                            0);
+            assertAnnouncement(gate);
+            assertBarrier(gate);
+
+            assertThat(target.triggeredCheckpoints, contains(1L));

Review comment:
       The test would throw the exception during `markAlignmentEnd` since the 
duration would be a negative number due to overflow. I enhanced the test to add 
the explicit check that markAlignmentStart is called after received the first 
`EndOfPartitionEvent`~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to