dawidwys commented on a change in pull request #15313:
URL: https://github.com/apache/flink/pull/15313#discussion_r603341493



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
##########
@@ -376,119 +586,26 @@ public void testTimeoutAlignmentOnUnalignedCheckpoint() 
throws Exception {
         assertBarrier(gate);
 
         assertEquals(
-                channelStateWriter.getAddedInput().get(getChannel(gate, 
1).getChannelInfo()).size(),
-                2);
+                2,
+                channelStateWriter
+                        .getAddedInput()
+                        .get(getChannel(gate, 1).getChannelInfo())
+                        .size());
         assertEquals(1, target.getTriggeredCheckpointCounter());
     }
 
     private RemoteInputChannel getChannel(CheckpointedInputGate gate, int 
channelIndex) {
         return (RemoteInputChannel) gate.getChannel(channelIndex);
     }
 
-    @Test
-    public void testTimeoutAlignmentConsistencyOnPreProcessBarrier() throws 
Exception {
-        testTimeoutAlignmentConsistency(true, false, false);
-    }
-
-    @Test
-    public void testTimeoutAlignmentConsistencyOnProcessBarrier() throws 
Exception {
-        testTimeoutAlignmentConsistency(false, true, false);
-    }
-
-    @Test
-    public void testTimeoutAlignmentConsistencyOnPostProcessBarrier() throws 
Exception {
-        testTimeoutAlignmentConsistency(false, false, true);
-    }
-
-    public void testTimeoutAlignmentConsistency(
-            boolean sleepBeforePreProcess,
-            boolean sleepBeforeProcess,
-            boolean sleepBeforePostProcess)
-            throws Exception {
-        SingleInputGate gate = new 
SingleInputGateBuilder().setNumberOfChannels(1).build();
-        TestInputChannel channel0 = new TestInputChannel(gate, 0, false, true);
-        gate.setInputChannels(channel0);
-
-        RecordingChannelStateWriter channelStateWriter = new 
RecordingChannelStateWriter();
-        AlternatingController controller =
-                new AlternatingController(
-                        new AlignedController(gate),
-                        new UnalignedController(
-                                new 
TestSubtaskCheckpointCoordinator(channelStateWriter), gate),
-                        clock);
-
-        long alignmentTimeout = 10;
-        CheckpointBarrier barrier =
-                new CheckpointBarrier(
-                        1, clock.relativeTimeMillis(), 
alignedNoTimeout(CHECKPOINT, getDefault()));
-
-        InputChannelInfo channelInfo = channel0.getChannelInfo();
-
-        controller.preProcessFirstBarrierOrAnnouncement(barrier);
-        controller.barrierAnnouncement(channelInfo, barrier, 1);
-
-        if (sleepBeforePreProcess) {
-            clock.advanceTime(alignmentTimeout * 2, TimeUnit.MILLISECONDS);
-        }
-        Optional<CheckpointBarrier> preProcessTrigger =
-                controller.preProcessFirstBarrier(channelInfo, barrier);
-        if (sleepBeforeProcess) {
-            clock.advanceTime(alignmentTimeout * 2, TimeUnit.MILLISECONDS);
-        }
-        Optional<CheckpointBarrier> processTrigger =
-                controller.barrierReceived(channelInfo, barrier);
-        if (sleepBeforePostProcess) {
-            clock.advanceTime(alignmentTimeout * 2, TimeUnit.MILLISECONDS);
-        }
-        Optional<CheckpointBarrier> postProcessTrigger =
-                controller.postProcessLastBarrier(channelInfo, barrier);
-
-        int triggeredCount = 0;
-        boolean unalignedCheckpoint = false;
-        if (preProcessTrigger.isPresent()) {
-            triggeredCount++;
-            unalignedCheckpoint =
-                    
preProcessTrigger.get().getCheckpointOptions().isUnalignedCheckpoint();
-            assertTrue(unalignedCheckpoint);
-        }
-        if (processTrigger.isPresent()) {
-            triggeredCount++;
-            unalignedCheckpoint =
-                    
processTrigger.get().getCheckpointOptions().isUnalignedCheckpoint();
-            assertTrue(unalignedCheckpoint);
-        }
-        if (postProcessTrigger.isPresent()) {
-            triggeredCount++;
-            unalignedCheckpoint =
-                    
postProcessTrigger.get().getCheckpointOptions().isUnalignedCheckpoint();
-        }
-
-        assertEquals(
-                String.format(
-                        "Checkpoint should be triggered exactly once, but [%s, 
%s, %s] was found instead",
-                        preProcessTrigger.isPresent(),
-                        processTrigger.isPresent(),
-                        postProcessTrigger.isPresent()),
-                1,
-                triggeredCount);
-
-        if (unalignedCheckpoint) {
-            // check that we can add output data if we are in unaligned 
checkpoint mode. In other
-            // words
-            // if the state writer has been initialised correctly.
-            assertEquals(barrier.getId(), 
channelStateWriter.getLastStartedCheckpointId());
-        }
-    }

Review comment:
       I checked the test again. The way I understand it it verified that only 
one of the methods `preProcessFirstBarrier`, `barrierReceived`, 
`postProcessLastBarrier` triggered checkpoint.
   
   It does not verify any of intrinsic characteristics of checkpoints itself 
the way I understand it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to