zhijiangW commented on a change in pull request #11351: [FLINK-16404][runtime] 
Avoid caching buffers for blocked input channels before barrier alignment
URL: https://github.com/apache/flink/pull/11351#discussion_r408960994
 
 

 ##########
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
 ##########
 @@ -900,247 +580,185 @@ public void testMultiChannelAbortCheckpoint() throws 
Exception {
                                // some buffers and a successful checkpoint
                        /* 0 */ createBuffer(0), createBuffer(2), 
createBuffer(0),
                        /* 3 */ createBarrier(1, 1), createBarrier(1, 2),
-                       /* 5 */ createBuffer(2), createBuffer(1),
-                       /* 7 */ createBarrier(1, 0),
-                       /* 8 */ createBuffer(0), createBuffer(2),
+                       /* 5 */ createBuffer(0),
+                       /* 6 */ createBarrier(1, 0),
+                       /* 7 */ createBuffer(0), createBuffer(2),
 
                                // aborted on last barrier
-                       /* 10 */ createBarrier(2, 0), createBarrier(2, 2),
-                       /* 12 */ createBuffer(0), createBuffer(2),
-                       /* 14 */ createCancellationBarrier(2, 1),
+                       /* 9 */  createBarrier(2, 0), createBarrier(2, 2),
+                       /* 11 */ createBuffer(1),
+                       /* 12 */ createCancellationBarrier(2, 1),
 
                                // successful checkpoint
-                       /* 15 */ createBuffer(2), createBuffer(1),
-                       /* 17 */ createBarrier(3, 1), createBarrier(3, 2), 
createBarrier(3, 0),
+                       /* 13 */ createBuffer(2), createBuffer(1),
+                       /* 15 */ createBarrier(3, 1), createBarrier(3, 2), 
createBarrier(3, 0),
 
                                // abort on first barrier
-                       /* 20 */ createBuffer(0), createBuffer(1),
-                       /* 22 */ createCancellationBarrier(4, 1), 
createBarrier(4, 2),
-                       /* 24 */ createBuffer(0),
-                       /* 25 */ createBarrier(4, 0),
+                       /* 18 */ createBuffer(0), createBuffer(1),
+                       /* 20 */ createCancellationBarrier(4, 1), 
createBarrier(4, 2),
+                       /* 22 */ createBuffer(2),
+                       /* 23 */ createBarrier(4, 0),
 
                                // another successful checkpoint
-                       /* 26 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
-                       /* 29 */ createBarrier(5, 2), createBarrier(5, 1), 
createBarrier(5, 0),
-                       /* 32 */ createBuffer(0), createBuffer(1),
+                       /* 24 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
+                       /* 27 */ createBarrier(5, 2), createBarrier(5, 1), 
createBarrier(5, 0),
+                       /* 30 */ createBuffer(0), createBuffer(1),
 
                                // abort multiple cancellations and a barrier 
after the cancellations
-                       /* 34 */ createCancellationBarrier(6, 1), 
createCancellationBarrier(6, 2),
-                       /* 36 */ createBarrier(6, 0),
+                       /* 32 */ createCancellationBarrier(6, 1), 
createCancellationBarrier(6, 2),
+                       /* 34 */ createBarrier(6, 0),
 
-                       /* 37 */ createBuffer(0)
+                       /* 35 */ createBuffer(0)
                };
-               AbstractInvokable toNotify = mock(AbstractInvokable.class);
+               ValidatingCheckpointHandler toNotify = new 
ValidatingCheckpointHandler();
                inputGate = createBarrierBuffer(3, sequence, toNotify);
 
                long startTs;
 
                // successful first checkpoint, with some aligned buffers
+               toNotify.setNextExpectedCheckpointId(1);
                check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
                check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
                check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
                startTs = System.nanoTime();
                check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
+               check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
                validateAlignmentTime(startTs, inputGate);
+               Integer[] expectedUnblockedChannels1 = new Integer[] {0, 1, 2};
+               assertArrayEquals(expectedUnblockedChannels1, 
mockInputGate.getAndResetLastUnblockedChannels().toArray());
 
-               check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
                check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
-               check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[11], inputGate.pollNext().get(), PAGE_SIZE);
 
                // canceled checkpoint on last barrier
-               startTs = System.nanoTime();
-               check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L),
-                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
-               validateAlignmentTime(startTs, inputGate);
                check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(2, toNotify.getLastCanceledCheckpointId());
+               Integer[] expectedUnblockedChannels2 = new Integer[] {0, 2};
+               assertArrayEquals(expectedUnblockedChannels2, 
mockInputGate.getAndResetLastUnblockedChannels().toArray());
+               
assertEquals(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER,
 toNotify.getCheckpointFailureReason());
+               check(sequence[14], inputGate.pollNext().get(), PAGE_SIZE);
 
                // one more successful checkpoint
-               check(sequence[15], inputGate.pollNext().get(), PAGE_SIZE);
-               check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
                startTs = System.nanoTime();
-               check(sequence[20], inputGate.pollNext().get(), PAGE_SIZE);
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
+               toNotify.setNextExpectedCheckpointId(3);
+               check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
                validateAlignmentTime(startTs, inputGate);
-               check(sequence[21], inputGate.pollNext().get(), PAGE_SIZE);
+               Integer[] expectedUnblockedChannels3 = new Integer[] {0, 1, 2};
+               assertArrayEquals(expectedUnblockedChannels3, 
mockInputGate.getAndResetLastUnblockedChannels().toArray());
+               check(sequence[19], inputGate.pollNext().get(), PAGE_SIZE);
 
                // this checkpoint gets immediately canceled
-               check(sequence[24], inputGate.pollNext().get(), PAGE_SIZE);
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L),
-                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
+               check(sequence[22], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(4, toNotify.getLastCanceledCheckpointId());
+               
assertEquals(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER,
 toNotify.getCheckpointFailureReason());
                assertEquals(0L, inputGate.getAlignmentDurationNanos());
+               Integer[] expectedUnblockedChannels4 = new Integer[] {2};
+               assertArrayEquals(expectedUnblockedChannels4, 
mockInputGate.getAndResetLastUnblockedChannels().toArray());
 
                // some buffers
+               check(sequence[24], inputGate.pollNext().get(), PAGE_SIZE);
+               Integer[] expectedUnblockedChannels5 = new Integer[] {0};
+               assertArrayEquals(expectedUnblockedChannels5, 
mockInputGate.getAndResetLastUnblockedChannels().toArray());
+               check(sequence[25], inputGate.pollNext().get(), PAGE_SIZE);
                check(sequence[26], inputGate.pollNext().get(), PAGE_SIZE);
-               check(sequence[27], inputGate.pollNext().get(), PAGE_SIZE);
-               check(sequence[28], inputGate.pollNext().get(), PAGE_SIZE);
 
                // a simple successful checkpoint
                startTs = System.nanoTime();
-               check(sequence[32], inputGate.pollNext().get(), PAGE_SIZE);
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
-               validateAlignmentTime(startTs, inputGate);
-               check(sequence[33], inputGate.pollNext().get(), PAGE_SIZE);
-
-               check(sequence[37], inputGate.pollNext().get(), PAGE_SIZE);
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L),
-                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
-               assertEquals(0L, inputGate.getAlignmentDurationNanos());
-       }
-
-       @Test
-       public void testAbortViaQueuedBarriers() throws Exception {
-               BufferOrEvent[] sequence = {
-                               // starting a checkpoint
-                       /* 0 */ createBuffer(1),
-                       /* 1 */ createBarrier(1, 1), createBarrier(1, 2),
-                       /* 3 */ createBuffer(2), createBuffer(0), 
createBuffer(1),
-
-                               // queued barrier and cancellation barrier
-                       /* 6 */ createCancellationBarrier(2, 2),
-                       /* 7 */ createBarrier(2, 1),
-
-                               // some intermediate buffers (some queued)
-                       /* 8 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
-
-                               // complete initial checkpoint
-                       /* 11 */ createBarrier(1, 0),
-
-                               // some buffers (none queued, since checkpoint 
is aborted)
-                       /* 12 */ createBuffer(2), createBuffer(1), 
createBuffer(0),
-
-                               // final barrier of aborted checkpoint
-                       /* 15 */ createBarrier(2, 0),
-
-                               // some more buffers
-                       /* 16 */ createBuffer(0), createBuffer(1), 
createBuffer(2)
-               };
-               AbstractInvokable toNotify = mock(AbstractInvokable.class);
-               inputGate = createBarrierBuffer(3, sequence, toNotify);
-
-               long startTs;
-
-               check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
-
-               // starting first checkpoint
-               startTs = System.nanoTime();
-               check(sequence[4], inputGate.pollNext().get(), PAGE_SIZE);
-               check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
-
-               // finished first checkpoint
-               check(sequence[3], inputGate.pollNext().get(), PAGE_SIZE);
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
+               toNotify.setNextExpectedCheckpointId(5);
+               check(sequence[30], inputGate.pollNext().get(), PAGE_SIZE);
                validateAlignmentTime(startTs, inputGate);
+               Integer[] expectedUnblockedChannels6 = new Integer[] {0, 1, 2};
+               assertArrayEquals(expectedUnblockedChannels6, 
mockInputGate.getAndResetLastUnblockedChannels().toArray());
+               check(sequence[31], inputGate.pollNext().get(), PAGE_SIZE);
 
-               check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
-
-               // re-read the queued cancellation barriers
-               check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L),
-                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
-               assertEquals(0L, inputGate.getAlignmentDurationNanos());
-
-               check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
-               check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
-               check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
-               check(sequence[14], inputGate.pollNext().get(), PAGE_SIZE);
-
-               check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
-               check(sequence[17], inputGate.pollNext().get(), PAGE_SIZE);
-               check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
-
-               // no further alignment should have happened
+               check(sequence[35], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(6, toNotify.getLastCanceledCheckpointId());
+               
assertEquals(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER,
 toNotify.getCheckpointFailureReason());
                assertEquals(0L, inputGate.getAlignmentDurationNanos());
+               Integer[] expectedUnblockedChannels7 = new Integer[] {0};
+               assertArrayEquals(expectedUnblockedChannels7, 
mockInputGate.getAndResetLastUnblockedChannels().toArray());
 
-               // no further checkpoint (abort) notifications
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(),
-                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
+               assertEquals(3, toNotify.getTriggerCheckpointCounter());
+               assertEquals(3, toNotify.getAbortCheckpointCounter());
        }
 
        /**
-        * This tests the where a replay of queued checkpoint barriers meets
-        * a canceled checkpoint.
+        * This tests the where a checkpoint barriers meets a canceled 
checkpoint.
 
 Review comment:
   This tests `the`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to