gaoyunhaii commented on a change in pull request #17131:
URL: https://github.com/apache/flink/pull/17131#discussion_r702647767
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
##########
@@ -932,6 +932,53 @@ public void
testTimeoutAlignmentAfterReceivedEndOfPartition() throws Exception {
}
}
+ /**
+ * This test verifies a special case that the checkpoint handler starts
the new checkpoint via
+ * received barrier announcement from the first channel, then {@link
EndOfPartitionEvent} from
+ * the second channel and then the barrier from the first channel. In this
case we should
+ * ensures the {@link
SingleCheckpointBarrierHandler#markAlignmentStart(long, long)} should be
+ * called. More information is available in
https://issues.apache.org/jira/browse/FLINK-24068.
+ */
+ @Test
+ public void testStartNewCheckpointViaAnnouncement() throws Exception {
+ int numChannels = 3;
+ ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
+ long alignmentTimeOut = 10000L;
+
+ try (CheckpointedInputGate gate =
+ new TestCheckpointedInputGateBuilder(3,
getTestBarrierHandlerFactory(target))
+ .withRemoteChannels()
+ .withMailboxExecutor()
+ .build()) {
+ getChannel(gate, 0)
+ .onBuffer(
+ barrier(
+ 1,
+ clock.relativeTimeMillis(),
+ alignedWithTimeout(getDefault(),
alignmentTimeOut)),
+ 0,
+ 0);
+ getChannel(gate, 1).onBuffer(endOfPartition(), 0, 0);
+
+ assertAnnouncement(gate);
+ assertEvent(gate, EndOfPartitionEvent.class);
+ assertBarrier(gate);
+
+ getChannel(gate, 2)
+ .onBuffer(
+ barrier(
+ 1,
+ clock.relativeTimeMillis(),
+ alignedWithTimeout(getDefault(),
alignmentTimeOut)),
+ 0,
+ 0);
+ assertAnnouncement(gate);
+ assertBarrier(gate);
+
+ assertThat(target.triggeredCheckpoints, contains(1L));
Review comment:
The test would throw the exception during `markAlignmentEnd` since the
duration would be a negative number due to overflow. I enhanced the test to add
the explicit check that markAlignmentStart is called after received the first
`EndOfPartitionEvent`.
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
##########
@@ -932,6 +932,53 @@ public void
testTimeoutAlignmentAfterReceivedEndOfPartition() throws Exception {
}
}
+ /**
+ * This test verifies a special case that the checkpoint handler starts
the new checkpoint via
+ * received barrier announcement from the first channel, then {@link
EndOfPartitionEvent} from
+ * the second channel and then the barrier from the first channel. In this
case we should
+ * ensures the {@link
SingleCheckpointBarrierHandler#markAlignmentStart(long, long)} should be
+ * called. More information is available in
https://issues.apache.org/jira/browse/FLINK-24068.
+ */
+ @Test
+ public void testStartNewCheckpointViaAnnouncement() throws Exception {
+ int numChannels = 3;
+ ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
+ long alignmentTimeOut = 10000L;
+
+ try (CheckpointedInputGate gate =
+ new TestCheckpointedInputGateBuilder(3,
getTestBarrierHandlerFactory(target))
+ .withRemoteChannels()
+ .withMailboxExecutor()
+ .build()) {
+ getChannel(gate, 0)
+ .onBuffer(
+ barrier(
+ 1,
+ clock.relativeTimeMillis(),
+ alignedWithTimeout(getDefault(),
alignmentTimeOut)),
+ 0,
+ 0);
+ getChannel(gate, 1).onBuffer(endOfPartition(), 0, 0);
+
+ assertAnnouncement(gate);
+ assertEvent(gate, EndOfPartitionEvent.class);
+ assertBarrier(gate);
+
+ getChannel(gate, 2)
+ .onBuffer(
+ barrier(
+ 1,
+ clock.relativeTimeMillis(),
+ alignedWithTimeout(getDefault(),
alignmentTimeOut)),
+ 0,
+ 0);
+ assertAnnouncement(gate);
+ assertBarrier(gate);
+
+ assertThat(target.triggeredCheckpoints, contains(1L));
Review comment:
The test would throw the exception during `markAlignmentEnd` since the
duration would be a negative number due to overflow. I enhanced the test to add
the explicit check that markAlignmentStart is called after received the first
`EndOfPartitionEvent`~
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]