dawidwys commented on a change in pull request #15313:
URL: https://github.com/apache/flink/pull/15313#discussion_r603341493
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
##########
@@ -376,119 +586,26 @@ public void testTimeoutAlignmentOnUnalignedCheckpoint()
throws Exception {
assertBarrier(gate);
assertEquals(
- channelStateWriter.getAddedInput().get(getChannel(gate,
1).getChannelInfo()).size(),
- 2);
+ 2,
+ channelStateWriter
+ .getAddedInput()
+ .get(getChannel(gate, 1).getChannelInfo())
+ .size());
assertEquals(1, target.getTriggeredCheckpointCounter());
}
private RemoteInputChannel getChannel(CheckpointedInputGate gate, int
channelIndex) {
return (RemoteInputChannel) gate.getChannel(channelIndex);
}
- @Test
- public void testTimeoutAlignmentConsistencyOnPreProcessBarrier() throws
Exception {
- testTimeoutAlignmentConsistency(true, false, false);
- }
-
- @Test
- public void testTimeoutAlignmentConsistencyOnProcessBarrier() throws
Exception {
- testTimeoutAlignmentConsistency(false, true, false);
- }
-
- @Test
- public void testTimeoutAlignmentConsistencyOnPostProcessBarrier() throws
Exception {
- testTimeoutAlignmentConsistency(false, false, true);
- }
-
- public void testTimeoutAlignmentConsistency(
- boolean sleepBeforePreProcess,
- boolean sleepBeforeProcess,
- boolean sleepBeforePostProcess)
- throws Exception {
- SingleInputGate gate = new
SingleInputGateBuilder().setNumberOfChannels(1).build();
- TestInputChannel channel0 = new TestInputChannel(gate, 0, false, true);
- gate.setInputChannels(channel0);
-
- RecordingChannelStateWriter channelStateWriter = new
RecordingChannelStateWriter();
- AlternatingController controller =
- new AlternatingController(
- new AlignedController(gate),
- new UnalignedController(
- new
TestSubtaskCheckpointCoordinator(channelStateWriter), gate),
- clock);
-
- long alignmentTimeout = 10;
- CheckpointBarrier barrier =
- new CheckpointBarrier(
- 1, clock.relativeTimeMillis(),
alignedNoTimeout(CHECKPOINT, getDefault()));
-
- InputChannelInfo channelInfo = channel0.getChannelInfo();
-
- controller.preProcessFirstBarrierOrAnnouncement(barrier);
- controller.barrierAnnouncement(channelInfo, barrier, 1);
-
- if (sleepBeforePreProcess) {
- clock.advanceTime(alignmentTimeout * 2, TimeUnit.MILLISECONDS);
- }
- Optional<CheckpointBarrier> preProcessTrigger =
- controller.preProcessFirstBarrier(channelInfo, barrier);
- if (sleepBeforeProcess) {
- clock.advanceTime(alignmentTimeout * 2, TimeUnit.MILLISECONDS);
- }
- Optional<CheckpointBarrier> processTrigger =
- controller.barrierReceived(channelInfo, barrier);
- if (sleepBeforePostProcess) {
- clock.advanceTime(alignmentTimeout * 2, TimeUnit.MILLISECONDS);
- }
- Optional<CheckpointBarrier> postProcessTrigger =
- controller.postProcessLastBarrier(channelInfo, barrier);
-
- int triggeredCount = 0;
- boolean unalignedCheckpoint = false;
- if (preProcessTrigger.isPresent()) {
- triggeredCount++;
- unalignedCheckpoint =
-
preProcessTrigger.get().getCheckpointOptions().isUnalignedCheckpoint();
- assertTrue(unalignedCheckpoint);
- }
- if (processTrigger.isPresent()) {
- triggeredCount++;
- unalignedCheckpoint =
-
processTrigger.get().getCheckpointOptions().isUnalignedCheckpoint();
- assertTrue(unalignedCheckpoint);
- }
- if (postProcessTrigger.isPresent()) {
- triggeredCount++;
- unalignedCheckpoint =
-
postProcessTrigger.get().getCheckpointOptions().isUnalignedCheckpoint();
- }
-
- assertEquals(
- String.format(
- "Checkpoint should be triggered exactly once, but [%s,
%s, %s] was found instead",
- preProcessTrigger.isPresent(),
- processTrigger.isPresent(),
- postProcessTrigger.isPresent()),
- 1,
- triggeredCount);
-
- if (unalignedCheckpoint) {
- // check that we can add output data if we are in unaligned
checkpoint mode. In other
- // words
- // if the state writer has been initialised correctly.
- assertEquals(barrier.getId(),
channelStateWriter.getLastStartedCheckpointId());
- }
- }
Review comment:
I checked the test again. The way I understand it it verified that only
one of the methods `preProcessFirstBarrier`, `barrierReceived`,
`postProcessLastBarrier` triggered checkpoint.
It does not verify any of intrinsic characteristics of checkpoints itself
the way I understand it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]