dawidwys commented on a change in pull request #17131:
URL: https://github.com/apache/flink/pull/17131#discussion_r701802303
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
##########
@@ -932,6 +932,53 @@ public void
testTimeoutAlignmentAfterReceivedEndOfPartition() throws Exception {
}
}
+ /**
+ * This test verifies a special case that the checkpoint handler starts
the new checkpoint via
+ * received barrier announcement from the first channel, then {@link
EndOfPartitionEvent} from
+ * the second channel and then the barrier from the first channel. In this
case we should
+ * ensures the {@link
SingleCheckpointBarrierHandler#markAlignmentStart(long, long)} should be
Review comment:
```suggestion
* ensure the {@link
SingleCheckpointBarrierHandler#markAlignmentStart(long, long)} should be
```
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
##########
@@ -932,6 +932,53 @@ public void
testTimeoutAlignmentAfterReceivedEndOfPartition() throws Exception {
}
}
+ /**
+ * This test verifies a special case that the checkpoint handler starts
the new checkpoint via
+ * received barrier announcement from the first channel, then {@link
EndOfPartitionEvent} from
+ * the second channel and then the barrier from the first channel. In this
case we should
+ * ensures the {@link
SingleCheckpointBarrierHandler#markAlignmentStart(long, long)} should be
+ * called. More information is available in
https://issues.apache.org/jira/browse/FLINK-24068.
+ */
+ @Test
+ public void testStartNewCheckpointViaAnnouncement() throws Exception {
+ int numChannels = 3;
+ ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
+ long alignmentTimeOut = 10000L;
+
+ try (CheckpointedInputGate gate =
+ new TestCheckpointedInputGateBuilder(3,
getTestBarrierHandlerFactory(target))
+ .withRemoteChannels()
+ .withMailboxExecutor()
+ .build()) {
+ getChannel(gate, 0)
+ .onBuffer(
+ barrier(
+ 1,
+ clock.relativeTimeMillis(),
+ alignedWithTimeout(getDefault(),
alignmentTimeOut)),
+ 0,
+ 0);
+ getChannel(gate, 1).onBuffer(endOfPartition(), 0, 0);
+
+ assertAnnouncement(gate);
+ assertEvent(gate, EndOfPartitionEvent.class);
+ assertBarrier(gate);
+
+ getChannel(gate, 2)
+ .onBuffer(
+ barrier(
+ 1,
+ clock.relativeTimeMillis(),
+ alignedWithTimeout(getDefault(),
alignmentTimeOut)),
+ 0,
+ 0);
+ assertAnnouncement(gate);
+ assertBarrier(gate);
+
+ assertThat(target.triggeredCheckpoints, contains(1L));
Review comment:
How do we check the `markAlignmentStart` has been called? Shouldn't we
check the value of the alignment time?
How does the test behave without your change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]