This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0d502b6addffdc23a4826796c630bf7f9dbae718
Author: Piotr Nowojski <[email protected]>
AuthorDate: Tue Jun 18 10:41:11 2019 +0200

    [FLINK-12777][network] Rename existing classes to make them in sync with 
the refactor
    
    1. Rename BarrierBuffer to CheckpointedInputGate
    CheckpointedInputGate was an interface, while BarrierBuffer was
    it's implementation. This rename means that we are dropping the interface
    and keeping only the concrete class.
    
    2. Rename BarrierBuffer and BarrierTracker tests to match this rename
    and previous refactorings.
---
 .../flink/streaming/runtime/io/BarrierBuffer.java  | 243 --------
 .../streaming/runtime/io/CachedBufferStorage.java  |   2 +-
 .../runtime/io/CheckpointedInputGate.java          | 228 ++++++-
 .../streaming/runtime/io/InputProcessorUtil.java   |  12 +-
 .../streaming/runtime/io/StreamInputProcessor.java |   2 +-
 .../runtime/io/StreamTaskNetworkInput.java         |  22 +-
 .../runtime/io/StreamTwoInputProcessor.java        |   2 +-
 .../io/StreamTwoInputSelectableProcessor.java      |   4 +-
 ...heckpointBarrierAlignerAlignmentLimitTest.java} |  15 +-
 ...CheckpointBarrierAlignerMassiveRandomTest.java} |   6 +-
 ....java => CheckpointBarrierAlignerTestBase.java} | 660 ++++++++++-----------
 ...Test.java => CheckpointBarrierTrackerTest.java} |  54 +-
 ...> CreditBasedCheckpointBarrierAlignerTest.java} |   8 +-
 ...a => SpillingCheckpointBarrierAlignerTest.java} |   8 +-
 .../tasks/StreamTaskCancellationBarrierTest.java   |   7 +-
 15 files changed, 614 insertions(+), 659 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
deleted file mode 100644
index 8dcc005..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The barrier buffer is {@link CheckpointedInputGate} that blocks inputs with 
barriers until
- * all inputs have received the barrier for a given checkpoint.
- *
- * <p>To avoid back-pressuring the input streams (which may cause distributed 
deadlocks), the
- * BarrierBuffer continues receiving buffers from the blocked channels and 
stores them internally until
- * the blocks are released.
- */
-@Internal
-public class BarrierBuffer implements CheckpointedInputGate {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(BarrierBuffer.class);
-
-       private final CheckpointBarrierHandler barrierHandler;
-
-       /** The gate that the buffer draws its input from. */
-       private final InputGate inputGate;
-
-       private final BufferStorage bufferStorage;
-
-       /** Flag to indicate whether we have drawn all available input. */
-       private boolean endOfInputGate;
-
-       /** Indicate end of the input. Set to true after encountering {@link 
#endOfInputGate} and depleting
-        * {@link #bufferStorage}. */
-       private boolean isFinished;
-
-       /**
-        * Creates a new checkpoint stream aligner.
-        *
-        * <p>There is no limit to how much data may be buffered during an 
alignment.
-        *
-        * @param inputGate The input gate to draw the buffers and events from.
-        * @param bufferStorage The storage to hold the buffers and events for 
blocked channels.
-        */
-       @VisibleForTesting
-       BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage) {
-               this (inputGate, bufferStorage, "Testing: No task associated", 
null);
-       }
-
-       BarrierBuffer(
-                       InputGate inputGate,
-                       BufferStorage bufferStorage,
-                       String taskName,
-                       @Nullable AbstractInvokable toNotifyOnCheckpoint) {
-               this(
-                       inputGate,
-                       bufferStorage,
-                       new CheckpointBarrierAligner(
-                               inputGate.getNumberOfInputChannels(),
-                               taskName,
-                               toNotifyOnCheckpoint)
-               );
-       }
-
-       /**
-        * Creates a new checkpoint stream aligner.
-        *
-        * <p>The aligner will allow only alignments that buffer up to the 
given number of bytes.
-        * When that number is exceeded, it will stop the alignment and notify 
the task that the
-        * checkpoint has been cancelled.
-        *
-        * @param inputGate The input gate to draw the buffers and events from.
-        * @param bufferStorage The storage to hold the buffers and events for 
blocked channels.
-        * @param barrierHandler Handler that controls which channels are 
blocked.
-        */
-       BarrierBuffer(
-                       InputGate inputGate,
-                       BufferStorage bufferStorage,
-                       CheckpointBarrierHandler barrierHandler) {
-               this.inputGate = inputGate;
-               this.bufferStorage = checkNotNull(bufferStorage);
-               this.barrierHandler = barrierHandler;
-       }
-
-       @Override
-       public CompletableFuture<?> isAvailable() {
-               if (bufferStorage.isEmpty()) {
-                       return inputGate.isAvailable();
-               }
-               return AVAILABLE;
-       }
-
-       @Override
-       public Optional<BufferOrEvent> pollNext() throws Exception {
-               while (true) {
-                       // process buffered BufferOrEvents before grabbing new 
ones
-                       Optional<BufferOrEvent> next;
-                       if (bufferStorage.isEmpty()) {
-                               next = inputGate.pollNext();
-                       }
-                       else {
-                               // TODO: FLINK-12536 for non credit-based flow 
control, getNext method is blocking
-                               next = bufferStorage.pollNext();
-                               if (!next.isPresent()) {
-                                       return pollNext();
-                               }
-                       }
-
-                       if (!next.isPresent()) {
-                               return handleEmptyBuffer();
-                       }
-
-                       BufferOrEvent bufferOrEvent = next.get();
-                       if 
(barrierHandler.isBlocked(bufferOrEvent.getChannelIndex())) {
-                               // if the channel is blocked, we just store the 
BufferOrEvent
-                               bufferStorage.add(bufferOrEvent);
-                               if (bufferStorage.isFull()) {
-                                       
barrierHandler.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes());
-                                       bufferStorage.rollOver();
-                               }
-                       }
-                       else if (bufferOrEvent.isBuffer()) {
-                               return next;
-                       }
-                       else if (bufferOrEvent.getEvent().getClass() == 
CheckpointBarrier.class) {
-                               CheckpointBarrier checkpointBarrier = 
(CheckpointBarrier) bufferOrEvent.getEvent();
-                               if (!endOfInputGate) {
-                                       // process barriers only if there is a 
chance of the checkpoint completing
-                                       if 
(barrierHandler.processBarrier(checkpointBarrier, 
bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) {
-                                               bufferStorage.rollOver();
-                                       }
-                               }
-                       }
-                       else if (bufferOrEvent.getEvent().getClass() == 
CancelCheckpointMarker.class) {
-                               if 
(barrierHandler.processCancellationBarrier((CancelCheckpointMarker) 
bufferOrEvent.getEvent())) {
-                                       bufferStorage.rollOver();
-                               }
-                       }
-                       else {
-                               if (bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class) {
-                                       if 
(barrierHandler.processEndOfPartition()) {
-                                               bufferStorage.rollOver();
-                                       }
-                               }
-                               return next;
-                       }
-               }
-       }
-
-       private Optional<BufferOrEvent> handleEmptyBuffer() throws Exception {
-               if (!inputGate.isFinished()) {
-                       return Optional.empty();
-               }
-
-               if (endOfInputGate) {
-                       isFinished = true;
-                       return Optional.empty();
-               } else {
-                       // end of input stream. stream continues with the 
buffered data
-                       endOfInputGate = true;
-                       barrierHandler.releaseBlocksAndResetBarriers();
-                       bufferStorage.rollOver();
-                       return pollNext();
-               }
-       }
-
-       @Override
-       public boolean isEmpty() {
-               return bufferStorage.isEmpty();
-       }
-
-       @Override
-       public boolean isFinished() {
-               return isFinished;
-       }
-
-       @Override
-       public void cleanup() throws IOException {
-               bufferStorage.close();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Properties
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Gets the ID defining the current pending, or just completed, 
checkpoint.
-        *
-        * @return The ID of the pending of completed checkpoint.
-        */
-       public long getLatestCheckpointId() {
-               return barrierHandler.getLatestCheckpointId();
-       }
-
-       @Override
-       public long getAlignmentDurationNanos() {
-               return barrierHandler.getAlignmentDurationNanos();
-       }
-
-       @Override
-       public int getNumberOfInputChannels() {
-               return inputGate.getNumberOfInputChannels();
-       }
-
-       // 
------------------------------------------------------------------------
-       // Utilities
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return barrierHandler.toString();
-       }
-}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
index 628a69c..4927c35 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
@@ -118,7 +118,7 @@ public class CachedBufferStorage extends 
AbstractBufferStorage {
         */
        public static class CachedBufferOrEventSequence implements 
BufferOrEventSequence {
 
-               /** The sequence of buffers and events to be consumed by {@link 
BarrierBuffer}.*/
+               /** The sequence of buffers and events to be consumed by {@link 
CheckpointedInputGate}.*/
                private final ArrayDeque<BufferOrEvent> queuedBuffers;
 
                /** The total size of the cached data. */
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
index cdbbfbc..7604d0a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
@@ -1,13 +1,12 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,29 +18,213 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.AsyncDataInput;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The {@link CheckpointedInputGate} uses {@link CheckpointBarrierHandler} to 
handle incoming
- * {@link org.apache.flink.runtime.io.network.api.CheckpointBarrier} from the 
{@link org.apache.flink.runtime.io.network.partition.consumer.InputGate}.
+ * {@link CheckpointBarrier} from the {@link InputGate}.
  */
 @Internal
-public interface CheckpointedInputGate extends AsyncDataInput<BufferOrEvent> {
+public class CheckpointedInputGate implements AsyncDataInput<BufferOrEvent> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointedInputGate.class);
+
+       private final CheckpointBarrierHandler barrierHandler;
+
+       /** The gate that the buffer draws its input from. */
+       private final InputGate inputGate;
+
+       private final BufferStorage bufferStorage;
+
+       /** Flag to indicate whether we have drawn all available input. */
+       private boolean endOfInputGate;
+
+       /** Indicate end of the input. Set to true after encountering {@link 
#endOfInputGate} and depleting
+        * {@link #bufferStorage}. */
+       private boolean isFinished;
+
        /**
-        * Cleans up all internally held resources.
+        * Creates a new checkpoint stream aligner.
         *
-        * @throws IOException Thrown if the cleanup of I/O resources failed.
+        * <p>There is no limit to how much data may be buffered during an 
alignment.
+        *
+        * @param inputGate The input gate to draw the buffers and events from.
+        * @param bufferStorage The storage to hold the buffers and events for 
blocked channels.
         */
-       void cleanup() throws IOException;
+       @VisibleForTesting
+       CheckpointedInputGate(InputGate inputGate, BufferStorage bufferStorage) 
{
+               this (inputGate, bufferStorage, "Testing: No task associated", 
null);
+       }
+
+       public CheckpointedInputGate(
+                       InputGate inputGate,
+                       BufferStorage bufferStorage,
+                       String taskName,
+                       @Nullable AbstractInvokable toNotifyOnCheckpoint) {
+               this(
+                       inputGate,
+                       bufferStorage,
+                       new CheckpointBarrierAligner(
+                               inputGate.getNumberOfInputChannels(),
+                               taskName,
+                               toNotifyOnCheckpoint)
+               );
+       }
+
+       /**
+        * Creates a new checkpoint stream aligner.
+        *
+        * <p>The aligner will allow only alignments that buffer up to the 
given number of bytes.
+        * When that number is exceeded, it will stop the alignment and notify 
the task that the
+        * checkpoint has been cancelled.
+        *
+        * @param inputGate The input gate to draw the buffers and events from.
+        * @param bufferStorage The storage to hold the buffers and events for 
blocked channels.
+        * @param barrierHandler Handler that controls which channels are 
blocked.
+        */
+       public CheckpointedInputGate(
+                       InputGate inputGate,
+                       BufferStorage bufferStorage,
+                       CheckpointBarrierHandler barrierHandler) {
+               this.inputGate = inputGate;
+               this.bufferStorage = checkNotNull(bufferStorage);
+               this.barrierHandler = barrierHandler;
+       }
+
+       @Override
+       public CompletableFuture<?> isAvailable() {
+               if (bufferStorage.isEmpty()) {
+                       return inputGate.isAvailable();
+               }
+               return AVAILABLE;
+       }
+
+       @Override
+       public Optional<BufferOrEvent> pollNext() throws Exception {
+               while (true) {
+                       // process buffered BufferOrEvents before grabbing new 
ones
+                       Optional<BufferOrEvent> next;
+                       if (bufferStorage.isEmpty()) {
+                               next = inputGate.pollNext();
+                       }
+                       else {
+                               // TODO: FLINK-12536 for non credit-based flow 
control, getNext method is blocking
+                               next = bufferStorage.pollNext();
+                               if (!next.isPresent()) {
+                                       return pollNext();
+                               }
+                       }
+
+                       if (!next.isPresent()) {
+                               return handleEmptyBuffer();
+                       }
+
+                       BufferOrEvent bufferOrEvent = next.get();
+                       if 
(barrierHandler.isBlocked(bufferOrEvent.getChannelIndex())) {
+                               // if the channel is blocked, we just store the 
BufferOrEvent
+                               bufferStorage.add(bufferOrEvent);
+                               if (bufferStorage.isFull()) {
+                                       
barrierHandler.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes());
+                                       bufferStorage.rollOver();
+                               }
+                       }
+                       else if (bufferOrEvent.isBuffer()) {
+                               return next;
+                       }
+                       else if (bufferOrEvent.getEvent().getClass() == 
CheckpointBarrier.class) {
+                               CheckpointBarrier checkpointBarrier = 
(CheckpointBarrier) bufferOrEvent.getEvent();
+                               if (!endOfInputGate) {
+                                       // process barriers only if there is a 
chance of the checkpoint completing
+                                       if 
(barrierHandler.processBarrier(checkpointBarrier, 
bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) {
+                                               bufferStorage.rollOver();
+                                       }
+                               }
+                       }
+                       else if (bufferOrEvent.getEvent().getClass() == 
CancelCheckpointMarker.class) {
+                               if 
(barrierHandler.processCancellationBarrier((CancelCheckpointMarker) 
bufferOrEvent.getEvent())) {
+                                       bufferStorage.rollOver();
+                               }
+                       }
+                       else {
+                               if (bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class) {
+                                       if 
(barrierHandler.processEndOfPartition()) {
+                                               bufferStorage.rollOver();
+                                       }
+                               }
+                               return next;
+                       }
+               }
+       }
+
+       private Optional<BufferOrEvent> handleEmptyBuffer() throws Exception {
+               if (!inputGate.isFinished()) {
+                       return Optional.empty();
+               }
+
+               if (endOfInputGate) {
+                       isFinished = true;
+                       return Optional.empty();
+               } else {
+                       // end of input stream. stream continues with the 
buffered data
+                       endOfInputGate = true;
+                       barrierHandler.releaseBlocksAndResetBarriers();
+                       bufferStorage.rollOver();
+                       return pollNext();
+               }
+       }
 
        /**
         * Checks if the barrier handler has buffered any data internally.
         * @return {@code True}, if no data is buffered internally, {@code 
false} otherwise.
         */
-       boolean isEmpty();
+       public boolean isEmpty() {
+               return bufferStorage.isEmpty();
+       }
+
+       @Override
+       public boolean isFinished() {
+               return isFinished;
+       }
+
+       /**
+        * Cleans up all internally held resources.
+        *
+        * @throws IOException Thrown if the cleanup of I/O resources failed.
+        */
+       public void cleanup() throws IOException {
+               bufferStorage.close();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Properties
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the ID defining the current pending, or just completed, 
checkpoint.
+        *
+        * @return The ID of the pending of completed checkpoint.
+        */
+       public long getLatestCheckpointId() {
+               return barrierHandler.getLatestCheckpointId();
+       }
 
        /**
         * Gets the time that the latest alignment took, in nanoseconds.
@@ -50,10 +233,23 @@ public interface CheckpointedInputGate extends 
AsyncDataInput<BufferOrEvent> {
         *
         * @return The duration in nanoseconds
         */
-       long getAlignmentDurationNanos();
+       public long getAlignmentDurationNanos() {
+               return barrierHandler.getAlignmentDurationNanos();
+       }
 
        /**
         * @return number of underlying input channels.
         */
-       int getNumberOfInputChannels();
+       public int getNumberOfInputChannels() {
+               return inputGate.getNumberOfInputChannels();
+       }
+
+       // 
------------------------------------------------------------------------
+       // Utilities
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return barrierHandler.toString();
+       }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 75926b9..7eda06c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -36,7 +36,7 @@ import java.io.IOException;
 @Internal
 public class InputProcessorUtil {
 
-       public static CheckpointedInputGate createCheckpointBarrierHandler(
+       public static CheckpointedInputGate createCheckpointedInputGate(
                        StreamTask<?, ?> checkpointedTask,
                        CheckpointingMode checkpointMode,
                        IOManager ioManager,
@@ -44,7 +44,7 @@ public class InputProcessorUtil {
                        Configuration taskManagerConfig,
                        String taskName) throws IOException {
 
-               CheckpointedInputGate barrierHandler;
+               CheckpointedInputGate checkpointedInputGate;
                if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
                        long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
                        if (!(maxAlign == -1 || maxAlign > 0)) {
@@ -54,20 +54,20 @@ public class InputProcessorUtil {
                        }
 
                        if 
(taskManagerConfig.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_CREDIT_MODEL))
 {
-                               barrierHandler = new BarrierBuffer(
+                               checkpointedInputGate = new 
CheckpointedInputGate(
                                        inputGate,
                                        new 
CachedBufferStorage(inputGate.getPageSize(), maxAlign, taskName),
                                        taskName,
                                        checkpointedTask);
                        } else {
-                               barrierHandler = new BarrierBuffer(
+                               checkpointedInputGate = new 
CheckpointedInputGate(
                                        inputGate,
                                        new BufferSpiller(ioManager, 
inputGate.getPageSize(), maxAlign, taskName),
                                        taskName,
                                        checkpointedTask);
                        }
                } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
-                       barrierHandler = new BarrierBuffer(
+                       checkpointedInputGate = new CheckpointedInputGate(
                                inputGate,
                                new EmptyBufferStorage(),
                                new 
CheckpointBarrierTracker(inputGate.getNumberOfInputChannels(), 
checkpointedTask));
@@ -75,6 +75,6 @@ public class InputProcessorUtil {
                        throw new IllegalArgumentException("Unrecognized 
Checkpointing Mode: " + checkpointMode);
                }
 
-               return barrierHandler;
+               return checkpointedInputGate;
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 58b2051..d6fcad2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -104,7 +104,7 @@ public class StreamInputProcessor<IN> {
 
                InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
-               CheckpointedInputGate barrierHandler = 
InputProcessorUtil.createCheckpointBarrierHandler(
+               CheckpointedInputGate barrierHandler = 
InputProcessorUtil.createCheckpointedInputGate(
                        checkpointedTask,
                        checkpointMode,
                        ioManager,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
index ecf88e2..8c37141 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
@@ -47,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 @Internal
 public final class StreamTaskNetworkInput implements StreamTaskInput {
 
-       private final CheckpointedInputGate barrierHandler;
+       private final CheckpointedInputGate checkpointedInputGate;
 
        private final DeserializationDelegate<StreamElement> 
deserializationDelegate;
 
@@ -63,16 +63,16 @@ public final class StreamTaskNetworkInput implements 
StreamTaskInput {
 
        @SuppressWarnings("unchecked")
        public StreamTaskNetworkInput(
-                       CheckpointedInputGate barrierHandler,
+                       CheckpointedInputGate checkpointedInputGate,
                        TypeSerializer<?> inputSerializer,
                        IOManager ioManager,
                        int inputIndex) {
-               this.barrierHandler = barrierHandler;
+               this.checkpointedInputGate = checkpointedInputGate;
                this.deserializationDelegate = new 
NonReusingDeserializationDelegate<>(
                        new StreamElementSerializer<>(inputSerializer));
 
                // Initialize one deserializer per input channel
-               this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[barrierHandler.getNumberOfInputChannels()];
+               this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[checkpointedInputGate.getNumberOfInputChannels()];
                for (int i = 0; i < recordDeserializers.length; i++) {
                        recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<>(
                                ioManager.getSpillingDirectoriesPaths());
@@ -99,14 +99,14 @@ public final class StreamTaskNetworkInput implements 
StreamTaskInput {
                                }
                        }
 
-                       Optional<BufferOrEvent> bufferOrEvent = 
barrierHandler.pollNext();
+                       Optional<BufferOrEvent> bufferOrEvent = 
checkpointedInputGate.pollNext();
                        if (bufferOrEvent.isPresent()) {
                                processBufferOrEvent(bufferOrEvent.get());
                        } else {
-                               if (barrierHandler.isFinished()) {
+                               if (checkpointedInputGate.isFinished()) {
                                        isFinished = true;
-                                       
checkState(barrierHandler.isAvailable().isDone(), "Finished BarrierHandler 
should be available");
-                                       if (!barrierHandler.isEmpty()) {
+                                       
checkState(checkpointedInputGate.isAvailable().isDone(), "Finished 
BarrierHandler should be available");
+                                       if (!checkpointedInputGate.isEmpty()) {
                                                throw new 
IllegalStateException("Trailing data in checkpoint barrier handler.");
                                        }
                                }
@@ -124,7 +124,7 @@ public final class StreamTaskNetworkInput implements 
StreamTaskInput {
                else {
                        // Event received
                        final AbstractEvent event = bufferOrEvent.getEvent();
-                       // TODO: with barrierHandler.isFinished() we might not 
need to support any events on this level.
+                       // TODO: with checkpointedInputGate.isFinished() we 
might not need to support any events on this level.
                        if (event.getClass() != EndOfPartitionEvent.class) {
                                throw new IOException("Unexpected event: " + 
event);
                        }
@@ -148,7 +148,7 @@ public final class StreamTaskNetworkInput implements 
StreamTaskInput {
 
        @Override
        public CompletableFuture<?> isAvailable() {
-               return barrierHandler.isAvailable();
+               return checkpointedInputGate.isAvailable();
        }
 
        @Override
@@ -162,6 +162,6 @@ public final class StreamTaskNetworkInput implements 
StreamTaskInput {
                        deserializer.clear();
                }
 
-               barrierHandler.cleanup();
+               checkpointedInputGate.cleanup();
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index aa6354d..f989e57 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -154,7 +154,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 
                final InputGate inputGate = 
InputGateUtil.createInputGate(inputGates1, inputGates2);
 
-               this.barrierHandler = 
InputProcessorUtil.createCheckpointBarrierHandler(
+               this.barrierHandler = 
InputProcessorUtil.createCheckpointedInputGate(
                        checkpointedTask,
                        checkpointMode,
                        ioManager,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
index d5ebf29..37c17db 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
@@ -121,8 +121,8 @@ public class StreamTwoInputSelectableProcessor<IN1, IN2> {
 
                // create a Input instance for each input
                CachedBufferStorage bufferStorage = new 
CachedBufferStorage(unionedInputGate1.getPageSize());
-               this.input1 = new StreamTaskNetworkInput(new 
BarrierBuffer(unionedInputGate1, bufferStorage, new 
CheckpointBarrierDiscarder()), inputSerializer1, ioManager, 0);
-               this.input2 = new StreamTaskNetworkInput(new 
BarrierBuffer(unionedInputGate2, bufferStorage, new 
CheckpointBarrierDiscarder()), inputSerializer2, ioManager, 1);
+               this.input1 = new StreamTaskNetworkInput(new 
CheckpointedInputGate(unionedInputGate1, bufferStorage, new 
CheckpointBarrierDiscarder()), inputSerializer1, ioManager, 0);
+               this.input2 = new StreamTaskNetworkInput(new 
CheckpointedInputGate(unionedInputGate2, bufferStorage, new 
CheckpointBarrierDiscarder()), inputSerializer2, ioManager, 1);
 
                this.statusWatermarkValve1 = new StatusWatermarkValve(
                        unionedInputGate1.getNumberOfInputChannels(),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
similarity index 94%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
rename to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
index 2eb3f5c..0621179 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.streaming.runtime.io.CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher;
 
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
@@ -56,9 +57,9 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.hamcrest.MockitoHamcrest.argThat;
 
 /**
- * Tests for the barrier buffer's maximum limit of buffered/spilled bytes.
+ * Tests for the {@link CheckpointBarrierAligner}'s maximum limit of 
buffered/spilled bytes.
  */
-public class BarrierBufferAlignmentLimitTest {
+public class CheckpointBarrierAlignerAlignmentLimitTest {
 
        private static final int PAGE_SIZE = 512;
 
@@ -116,7 +117,7 @@ public class BarrierBufferAlignmentLimitTest {
                // the barrier buffer has a limit that only 1000 bytes may be 
spilled in alignment
                MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
-               BarrierBuffer buffer = new BarrierBuffer(
+               CheckpointedInputGate buffer = new CheckpointedInputGate(
                        gate,
                        new BufferSpiller(ioManager, gate.getPageSize(), 1000),
                        "Testing",
@@ -139,7 +140,7 @@ public class BarrierBufferAlignmentLimitTest {
                check(sequence[5], buffer.pollNext().get());
                validateAlignmentTime(startTs, buffer);
                verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(7L),
-                       argThat(new 
BarrierBufferTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED)));
+                       argThat(new 
CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED)));
 
                // playing back buffered events
                check(sequence[7], buffer.pollNext().get());
@@ -213,7 +214,7 @@ public class BarrierBufferAlignmentLimitTest {
                // the barrier buffer has a limit that only 1000 bytes may be 
spilled in alignment
                MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
-               BarrierBuffer buffer = new BarrierBuffer(
+               CheckpointedInputGate buffer = new CheckpointedInputGate(
                        gate,
                        new BufferSpiller(ioManager, gate.getPageSize(), 500),
                        "Testing",
@@ -237,7 +238,7 @@ public class BarrierBufferAlignmentLimitTest {
                check(sequence[4], buffer.pollNext().get());
                validateAlignmentTime(startTs, buffer);
                verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L),
-                       argThat(new 
BarrierBufferTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED)));
+                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED)));
 
                // replay buffered data - in the middle, the alignment for 
checkpoint 4 starts
                check(sequence[6], buffer.pollNext().get());
@@ -314,7 +315,7 @@ public class BarrierBufferAlignmentLimitTest {
                }
        }
 
-       private static void validateAlignmentTime(long startTimestamp, 
BarrierBuffer buffer) {
+       private static void validateAlignmentTime(long startTimestamp, 
CheckpointedInputGate buffer) {
                final long elapsed = System.nanoTime() - startTimestamp;
                assertTrue("wrong alignment time", 
buffer.getAlignmentDurationNanos() <= elapsed);
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
similarity index 95%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
rename to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
index 7fc8a5d..7da0aa3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
@@ -41,7 +41,7 @@ import static org.junit.Assert.fail;
  * and randomly generate checkpoint barriers. The two streams are very
  * unaligned, putting heavy work on the BarrierBuffer.
  */
-public class BarrierBufferMassiveRandomTest {
+public class CheckpointBarrierAlignerMassiveRandomTest {
 
        private static final int PAGE_SIZE = 1024;
 
@@ -62,10 +62,10 @@ public class BarrierBufferMassiveRandomTest {
                                        new BufferPool[] { pool1, pool2 },
                                        new BarrierGenerator[] { new 
CountBarrier(100000), new RandomBarrier(100000) });
 
-                       BarrierBuffer barrierBuffer = new BarrierBuffer(myIG, 
new BufferSpiller(ioMan, myIG.getPageSize()));
+                       CheckpointedInputGate checkpointedInputGate = new 
CheckpointedInputGate(myIG, new BufferSpiller(ioMan, myIG.getPageSize()));
 
                        for (int i = 0; i < 2000000; i++) {
-                               BufferOrEvent boe = 
barrierBuffer.pollNext().get();
+                               BufferOrEvent boe = 
checkpointedInputGate.pollNext().get();
                                if (boe.isBuffer()) {
                                        boe.getBuffer().recycleBuffer();
                                }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
similarity index 63%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
rename to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
index 13c4aad..687c95d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerTestBase.java
@@ -60,9 +60,9 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.hamcrest.MockitoHamcrest.argThat;
 
 /**
- * Tests for the behavior of the {@link BarrierBuffer} with different {@link 
BufferStorage} implements.
+ * Tests for the behavior of the {@link CheckpointBarrierAligner} with 
different {@link BufferStorage} implements.
  */
-public abstract class BarrierBufferTestBase {
+public abstract class CheckpointBarrierAlignerTestBase {
 
        protected static final int PAGE_SIZE = 512;
 
@@ -70,9 +70,9 @@ public abstract class BarrierBufferTestBase {
 
        private static int sizeCounter = 1;
 
-       BarrierBuffer buffer;
+       CheckpointedInputGate inputGate;
 
-       protected BarrierBuffer createBarrierBuffer(
+       protected CheckpointedInputGate createBarrierBuffer(
                int numberOfChannels,
                BufferOrEvent[] sequence,
                @Nullable AbstractInvokable toNotify) throws IOException {
@@ -80,25 +80,25 @@ public abstract class BarrierBufferTestBase {
                return createBarrierBuffer(gate, toNotify);
        }
 
-       protected BarrierBuffer createBarrierBuffer(int numberOfChannels, 
BufferOrEvent[] sequence) throws IOException {
+       protected CheckpointedInputGate createBarrierBuffer(int 
numberOfChannels, BufferOrEvent[] sequence) throws IOException {
                return createBarrierBuffer(numberOfChannels, sequence, null);
        }
 
-       protected BarrierBuffer createBarrierBuffer(InputGate gate) throws 
IOException {
+       protected CheckpointedInputGate createBarrierBuffer(InputGate gate) 
throws IOException {
                return createBarrierBuffer(gate, null);
        }
 
-       abstract BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable 
AbstractInvokable toNotify) throws IOException;
+       abstract CheckpointedInputGate createBarrierBuffer(InputGate gate, 
@Nullable AbstractInvokable toNotify) throws IOException;
 
        abstract void validateAlignmentBuffered(long actualBytesBuffered, 
BufferOrEvent... sequence);
 
        @After
        public void ensureEmpty() throws Exception {
-               assertFalse(buffer.pollNext().isPresent());
-               assertTrue(buffer.isFinished());
-               assertTrue(buffer.isEmpty());
+               assertFalse(inputGate.pollNext().isPresent());
+               assertTrue(inputGate.isFinished());
+               assertTrue(inputGate.isEmpty());
 
-               buffer.cleanup();
+               inputGate.cleanup();
        }
 
        // 
------------------------------------------------------------------------
@@ -115,13 +115,13 @@ public abstract class BarrierBufferTestBase {
                        createBuffer(0), createBuffer(0),
                        createBuffer(0), createEndOfPartition(0)
                };
-               buffer = createBarrierBuffer(1, sequence);
+               inputGate = createBarrierBuffer(1, sequence);
 
                for (BufferOrEvent boe : sequence) {
-                       assertEquals(boe, buffer.pollNext().get());
+                       assertEquals(boe, inputGate.pollNext().get());
                }
 
-               assertEquals(0L, buffer.getAlignmentDurationNanos());
+               assertEquals(0L, inputGate.getAlignmentDurationNanos());
        }
 
        /**
@@ -136,13 +136,13 @@ public abstract class BarrierBufferTestBase {
                        createBuffer(3), createBuffer(1), 
createEndOfPartition(3),
                        createBuffer(1), createEndOfPartition(1), 
createBuffer(2), createEndOfPartition(2)
                };
-               buffer = createBarrierBuffer(4, sequence);
+               inputGate = createBarrierBuffer(4, sequence);
 
                for (BufferOrEvent boe : sequence) {
-                       assertEquals(boe, buffer.pollNext().get());
+                       assertEquals(boe, inputGate.pollNext().get());
                }
 
-               assertEquals(0L, buffer.getAlignmentDurationNanos());
+               assertEquals(0L, inputGate.getAlignmentDurationNanos());
        }
 
        /**
@@ -161,13 +161,13 @@ public abstract class BarrierBufferTestBase {
                        createBuffer(0), createEndOfPartition(0)
                };
                ValidatingCheckpointHandler handler = new 
ValidatingCheckpointHandler();
-               buffer = createBarrierBuffer(1, sequence, handler);
+               inputGate = createBarrierBuffer(1, sequence, handler);
 
                handler.setNextExpectedCheckpointId(1L);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer() || boe.getEvent().getClass() != 
CheckpointBarrier.class) {
-                               assertEquals(boe, buffer.pollNext().get());
+                               assertEquals(boe, inputGate.pollNext().get());
                        }
                }
        }
@@ -211,81 +211,81 @@ public abstract class BarrierBufferTestBase {
                        createEndOfPartition(0), createEndOfPartition(1), 
createEndOfPartition(2)
                };
                ValidatingCheckpointHandler handler = new 
ValidatingCheckpointHandler();
-               buffer = createBarrierBuffer(3, sequence, handler);
+               inputGate = createBarrierBuffer(3, sequence, handler);
 
                handler.setNextExpectedCheckpointId(1L);
 
                // pre checkpoint 1
-               check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
                assertEquals(1L, handler.getNextExpectedCheckpointId());
 
                long startTs = System.nanoTime();
 
                // blocking while aligning for checkpoint 1
-               check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
                assertEquals(1L, handler.getNextExpectedCheckpointId());
 
                // checkpoint 1 done, returning buffered data
-               check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
                assertEquals(2L, handler.getNextExpectedCheckpointId());
-               validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
+               validateAlignmentTime(startTs, 
inputGate.getAlignmentDurationNanos());
                
validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment(), 
sequence[5], sequence[6]);
 
-               check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
 
                // pre checkpoint 2
-               check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[11], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[11], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
                assertEquals(2L, handler.getNextExpectedCheckpointId());
 
                // checkpoint 2 barriers come together
                startTs = System.nanoTime();
-               check(sequence[17], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[17], inputGate.pollNext().get(), PAGE_SIZE);
                assertEquals(3L, handler.getNextExpectedCheckpointId());
-               validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
+               validateAlignmentTime(startTs, 
inputGate.getAlignmentDurationNanos());
                
validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment());
 
-               check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
 
                // checkpoint 3 starts, data buffered
-               check(sequence[20], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[20], inputGate.pollNext().get(), PAGE_SIZE);
                
validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment(), 
sequence[20], sequence[21]);
                assertEquals(4L, handler.getNextExpectedCheckpointId());
-               check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[21], inputGate.pollNext().get(), PAGE_SIZE);
 
                // checkpoint 4 happens without extra data
 
                // pre checkpoint 5
-               check(sequence[27], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[27], inputGate.pollNext().get(), PAGE_SIZE);
 
                
validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment());
                assertEquals(5L, handler.getNextExpectedCheckpointId());
 
-               check(sequence[28], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[29], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[28], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[29], inputGate.pollNext().get(), PAGE_SIZE);
 
                // checkpoint 5 aligning
-               check(sequence[31], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[32], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[33], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[37], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[31], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[32], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[33], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[37], inputGate.pollNext().get(), PAGE_SIZE);
 
                // buffered data from checkpoint 5 alignment
-               check(sequence[34], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[36], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[38], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[39], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[34], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[36], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[38], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[39], inputGate.pollNext().get(), PAGE_SIZE);
 
                // remaining data
-               check(sequence[41], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[42], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[43], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[44], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[41], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[42], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[43], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[44], inputGate.pollNext().get(), PAGE_SIZE);
 
                
validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment(),
                        sequence[34], sequence[36], sequence[38], sequence[39]);
@@ -304,36 +304,36 @@ public abstract class BarrierBufferTestBase {
                        createBuffer(2), createEndOfPartition(2), 
createBuffer(0), createEndOfPartition(0)
                };
                ValidatingCheckpointHandler handler = new 
ValidatingCheckpointHandler();
-               buffer = createBarrierBuffer(3, sequence, handler);
+               inputGate = createBarrierBuffer(3, sequence, handler);
 
                handler.setNextExpectedCheckpointId(1L);
 
                // pre-checkpoint 1
-               check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
                assertEquals(1L, handler.getNextExpectedCheckpointId());
 
                // pre-checkpoint 2
-               check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
                assertEquals(2L, handler.getNextExpectedCheckpointId());
-               check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
 
                // checkpoint 2 alignment
                long startTs = System.nanoTime();
-               check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[14], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
-               validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
+               check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[14], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[19], inputGate.pollNext().get(), PAGE_SIZE);
+               validateAlignmentTime(startTs, 
inputGate.getAlignmentDurationNanos());
 
                // end of stream: remaining buffered contents
-               check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[11], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[17], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[11], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[17], inputGate.pollNext().get(), PAGE_SIZE);
        }
 
        /**
@@ -379,69 +379,69 @@ public abstract class BarrierBufferTestBase {
                        createBuffer(0), createEndOfPartition(0)
                };
                ValidatingCheckpointHandler handler = new 
ValidatingCheckpointHandler();
-               buffer = createBarrierBuffer(3, sequence, handler);
+               inputGate = createBarrierBuffer(3, sequence, handler);
 
                handler.setNextExpectedCheckpointId(1L);
 
                // around checkpoint 1
-               check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
 
-               check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
                assertEquals(2L, handler.getNextExpectedCheckpointId());
-               check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
 
                // alignment of checkpoint 2 - buffering also some barriers for
                // checkpoints 3 and 4
                long startTs = System.nanoTime();
-               check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[20], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[23], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[20], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[23], inputGate.pollNext().get(), PAGE_SIZE);
 
                // checkpoint 2 completed
-               check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-               validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
-               check(sequence[25], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[27], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[30], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[32], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+               validateAlignmentTime(startTs, 
inputGate.getAlignmentDurationNanos());
+               check(sequence[25], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[27], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[30], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[32], inputGate.pollNext().get(), PAGE_SIZE);
 
                // checkpoint 3 completed (emit buffered)
-               check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[28], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[19], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[28], inputGate.pollNext().get(), PAGE_SIZE);
 
                // past checkpoint 3
-               check(sequence[36], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[38], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[36], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[38], inputGate.pollNext().get(), PAGE_SIZE);
 
                // checkpoint 4 completed (emit buffered)
-               check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[26], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[31], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[33], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[39], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[22], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[26], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[31], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[33], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[39], inputGate.pollNext().get(), PAGE_SIZE);
 
                // past checkpoint 4, alignment for checkpoint 5
-               check(sequence[42], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[45], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[46], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[42], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[45], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[46], inputGate.pollNext().get(), PAGE_SIZE);
 
                // abort checkpoint 5 (end of partition)
-               check(sequence[37], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[37], inputGate.pollNext().get(), PAGE_SIZE);
 
                // start checkpoint 6 alignment
-               check(sequence[47], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[48], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[47], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[48], inputGate.pollNext().get(), PAGE_SIZE);
 
                // end of input, emit remainder
-               check(sequence[43], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[44], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[43], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[44], inputGate.pollNext().get(), PAGE_SIZE);
        }
 
        /**
@@ -471,58 +471,58 @@ public abstract class BarrierBufferTestBase {
                        createBuffer(0), createEndOfPartition(0)
                };
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
-               buffer = createBarrierBuffer(3, sequence, toNotify);
+               inputGate = createBarrierBuffer(3, sequence, toNotify);
 
                long startTs;
 
                // initial data
-               check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
 
                // align checkpoint 1
                startTs = System.nanoTime();
-               check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(1L, buffer.getLatestCheckpointId());
+               check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(1L, inputGate.getLatestCheckpointId());
 
                // checkpoint done - replay buffered
-               check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
-               validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
+               check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
+               validateAlignmentTime(startTs, 
inputGate.getAlignmentDurationNanos());
                verify(toNotify).triggerCheckpointOnBarrier(argThat(new 
CheckpointMatcher(1L)), any(CheckpointOptions.class), 
any(CheckpointMetrics.class));
-               check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
 
-               check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
 
                // alignment of checkpoint 2
                startTs = System.nanoTime();
-               check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[15], inputGate.pollNext().get(), PAGE_SIZE);
 
                // checkpoint 2 aborted, checkpoint 3 started
-               check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(3L, buffer.getLatestCheckpointId());
-               validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
+               check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(3L, inputGate.getLatestCheckpointId());
+               validateAlignmentTime(startTs, 
inputGate.getAlignmentDurationNanos());
                verify(toNotify).abortCheckpointOnBarrier(eq(2L),
                        argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)));
-               check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
 
                // checkpoint 3 alignment in progress
-               check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[19], inputGate.pollNext().get(), PAGE_SIZE);
 
                // checkpoint 3 aborted (end of partition)
-               check(sequence[20], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[20], inputGate.pollNext().get(), PAGE_SIZE);
                verify(toNotify).abortCheckpointOnBarrier(eq(3L),
                        argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM)));
 
                // replay buffered data from checkpoint 3
-               check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
 
                // all the remaining messages
-               check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[23], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[24], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[21], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[22], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[23], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[24], inputGate.pollNext().get(), PAGE_SIZE);
        }
 
        /**
@@ -556,50 +556,50 @@ public abstract class BarrierBufferTestBase {
                        createBuffer(0), createEndOfPartition(0)
                };
                ValidatingCheckpointHandler handler = new 
ValidatingCheckpointHandler();
-               buffer = createBarrierBuffer(3, sequence, handler);
+               inputGate = createBarrierBuffer(3, sequence, handler);
 
                handler.setNextExpectedCheckpointId(1L);
 
                // checkpoint 1
-               check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(1L, buffer.getLatestCheckpointId());
+               check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(1L, inputGate.getLatestCheckpointId());
 
-               check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
 
                // alignment of checkpoint 2
-               check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(2L, buffer.getLatestCheckpointId());
-               check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(2L, inputGate.getLatestCheckpointId());
+               check(sequence[15], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[19], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[21], inputGate.pollNext().get(), PAGE_SIZE);
 
                long startTs = System.nanoTime();
 
                // checkpoint 2 aborted, checkpoint 4 started. replay buffered
-               check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(4L, buffer.getLatestCheckpointId());
-               check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(4L, inputGate.getLatestCheckpointId());
+               check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[22], inputGate.pollNext().get(), PAGE_SIZE);
 
                // align checkpoint 4 remainder
-               check(sequence[25], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[26], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[25], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[26], inputGate.pollNext().get(), PAGE_SIZE);
 
-               validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
+               validateAlignmentTime(startTs, 
inputGate.getAlignmentDurationNanos());
 
                // checkpoint 4 aborted (due to end of partition)
-               check(sequence[24], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[27], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[28], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[29], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[30], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[24], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[27], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[28], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[29], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[30], inputGate.pollNext().get(), PAGE_SIZE);
        }
 
        /**
@@ -644,48 +644,48 @@ public abstract class BarrierBufferTestBase {
                        createBuffer(2), createEndOfPartition(2),
                        createBuffer(0), createEndOfPartition(0)
                };
-               buffer = createBarrierBuffer(3, sequence);
+               inputGate = createBarrierBuffer(3, sequence);
 
                // checkpoint 1
-               check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(1L, buffer.getLatestCheckpointId());
-               check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(1L, inputGate.getLatestCheckpointId());
+               check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
 
                // alignment of checkpoint 2
-               check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(2L, buffer.getLatestCheckpointId());
+               check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[22], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(2L, inputGate.getLatestCheckpointId());
 
                // checkpoint 2 completed
-               check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[15], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
 
                // checkpoint 3 skipped, alignment for 4 started
-               check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(4L, buffer.getLatestCheckpointId());
-               check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[24], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[26], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[30], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(4L, inputGate.getLatestCheckpointId());
+               check(sequence[21], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[24], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[26], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[30], inputGate.pollNext().get(), PAGE_SIZE);
 
                // checkpoint 4 completed
-               check(sequence[20], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[28], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[29], buffer.pollNext().get(), PAGE_SIZE);
-
-               check(sequence[32], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[33], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[34], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[35], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[36], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[37], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[20], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[28], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[29], inputGate.pollNext().get(), PAGE_SIZE);
+
+               check(sequence[32], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[33], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[34], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[35], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[36], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[37], inputGate.pollNext().get(), PAGE_SIZE);
        }
 
        @Test
@@ -698,11 +698,11 @@ public abstract class BarrierBufferTestBase {
                        createBuffer(0)
                };
                AbstractInvokable validator = new 
CheckpointSequenceValidator(-3);
-               buffer = createBarrierBuffer(2, sequence, validator);
+               inputGate = createBarrierBuffer(2, sequence, validator);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer() || (boe.getEvent().getClass() != 
CheckpointBarrier.class && boe.getEvent().getClass() != 
CancelCheckpointMarker.class)) {
-                               assertEquals(boe, buffer.pollNext().get());
+                               assertEquals(boe, inputGate.pollNext().get());
                        }
                }
        }
@@ -720,34 +720,34 @@ public abstract class BarrierBufferTestBase {
                        createBuffer(2), createEndOfPartition(2), 
createBuffer(0), createEndOfPartition(0)
                };
                ValidatingCheckpointHandler handler = new 
ValidatingCheckpointHandler();
-               buffer = createBarrierBuffer(3, sequence, handler);
+               inputGate = createBarrierBuffer(3, sequence, handler);
 
                handler.setNextExpectedCheckpointId(1L);
 
                // pre-checkpoint 1
-               check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
                assertEquals(1L, handler.getNextExpectedCheckpointId());
 
                // pre-checkpoint 2
-               check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
                assertEquals(2L, handler.getNextExpectedCheckpointId());
-               check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
 
                // checkpoint 2 alignment
-               check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[14], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[14], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[19], inputGate.pollNext().get(), PAGE_SIZE);
 
                // drain buffer
-               buffer.pollNext().get();
-               buffer.pollNext().get();
-               buffer.pollNext().get();
-               buffer.pollNext().get();
-               buffer.pollNext().get();
+               inputGate.pollNext().get();
+               inputGate.pollNext().get();
+               inputGate.pollNext().get();
+               inputGate.pollNext().get();
+               inputGate.pollNext().get();
        }
 
        @Test
@@ -779,36 +779,36 @@ public abstract class BarrierBufferTestBase {
                        createBuffer(3),
                        createEndOfPartition(3)
                };
-               buffer = createBarrierBuffer(4, sequence);
+               inputGate = createBarrierBuffer(4, sequence);
 
                // pre checkpoint 2
-               check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[3], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[4], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[3], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[4], inputGate.pollNext().get(), PAGE_SIZE);
 
                // checkpoint 3 alignment
-               check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(2L, buffer.getLatestCheckpointId());
-               check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[11], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(2L, inputGate.getLatestCheckpointId());
+               check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[11], inputGate.pollNext().get(), PAGE_SIZE);
 
                // checkpoint 3 buffered
-               check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(3L, buffer.getLatestCheckpointId());
+               check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(3L, inputGate.getLatestCheckpointId());
 
                // after checkpoint 4
-               check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(4L, buffer.getLatestCheckpointId());
-               check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[17], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
-
-               check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(5L, buffer.getLatestCheckpointId());
-               check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[15], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(4L, inputGate.getLatestCheckpointId());
+               check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[17], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
+
+               check(sequence[19], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[21], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(5L, inputGate.getLatestCheckpointId());
+               check(sequence[22], inputGate.pollNext().get(), PAGE_SIZE);
        }
 
        @Test
@@ -831,25 +831,25 @@ public abstract class BarrierBufferTestBase {
                        // final end of stream
                        createEndOfPartition(0)
                };
-               buffer = createBarrierBuffer(3, sequence);
+               inputGate = createBarrierBuffer(3, sequence);
 
                // data after first checkpoint
-               check(sequence[3], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[4], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(1L, buffer.getLatestCheckpointId());
+               check(sequence[3], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[4], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(1L, inputGate.getLatestCheckpointId());
 
                // alignment of second checkpoint
-               check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(2L, buffer.getLatestCheckpointId());
+               check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(2L, inputGate.getLatestCheckpointId());
 
                // first end-of-partition encountered: checkpoint will not be 
completed
-               check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[11], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[14], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[11], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[14], inputGate.pollNext().get(), PAGE_SIZE);
        }
 
        @Test
@@ -866,26 +866,26 @@ public abstract class BarrierBufferTestBase {
                        createBuffer(0)
                };
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
-               buffer = createBarrierBuffer(1, sequence, toNotify);
+               inputGate = createBarrierBuffer(1, sequence, toNotify);
 
-               check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
-               assertEquals(0L, buffer.getAlignmentDurationNanos());
+               assertEquals(0L, inputGate.getAlignmentDurationNanos());
 
-               check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(5L, buffer.getLatestCheckpointId());
+               check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(5L, inputGate.getLatestCheckpointId());
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
                verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L),
                        argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
-               assertEquals(0L, buffer.getAlignmentDurationNanos());
+               assertEquals(0L, inputGate.getAlignmentDurationNanos());
 
-               check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(6L, buffer.getLatestCheckpointId());
+               check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
+               assertEquals(6L, inputGate.getLatestCheckpointId());
                verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L),
                        argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
-               assertEquals(0L, buffer.getAlignmentDurationNanos());
+               assertEquals(0L, inputGate.getAlignmentDurationNanos());
        }
 
        @Test
@@ -925,62 +925,62 @@ public abstract class BarrierBufferTestBase {
                        /* 37 */ createBuffer(0)
                };
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
-               buffer = createBarrierBuffer(3, sequence, toNotify);
+               inputGate = createBarrierBuffer(3, sequence, toNotify);
 
                long startTs;
 
                // successful first checkpoint, with some aligned buffers
-               check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[1], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
                startTs = System.nanoTime();
-               check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
-               validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
+               validateAlignmentTime(startTs, 
inputGate.getAlignmentDurationNanos());
 
-               check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
 
                // canceled checkpoint on last barrier
                startTs = System.nanoTime();
-               check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
                verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L),
                        argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
-               validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
-               check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
+               validateAlignmentTime(startTs, 
inputGate.getAlignmentDurationNanos());
+               check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
 
                // one more successful checkpoint
-               check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[15], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
                startTs = System.nanoTime();
-               check(sequence[20], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[20], inputGate.pollNext().get(), PAGE_SIZE);
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
-               validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
-               check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
+               validateAlignmentTime(startTs, 
inputGate.getAlignmentDurationNanos());
+               check(sequence[21], inputGate.pollNext().get(), PAGE_SIZE);
 
                // this checkpoint gets immediately canceled
-               check(sequence[24], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[24], inputGate.pollNext().get(), PAGE_SIZE);
                verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L),
                        argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
-               assertEquals(0L, buffer.getAlignmentDurationNanos());
+               assertEquals(0L, inputGate.getAlignmentDurationNanos());
 
                // some buffers
-               check(sequence[26], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[27], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[28], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[26], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[27], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[28], inputGate.pollNext().get(), PAGE_SIZE);
 
                // a simple successful checkpoint
                startTs = System.nanoTime();
-               check(sequence[32], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[32], inputGate.pollNext().get(), PAGE_SIZE);
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
-               validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
-               check(sequence[33], buffer.pollNext().get(), PAGE_SIZE);
+               validateAlignmentTime(startTs, 
inputGate.getAlignmentDurationNanos());
+               check(sequence[33], inputGate.pollNext().get(), PAGE_SIZE);
 
-               check(sequence[37], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[37], inputGate.pollNext().get(), PAGE_SIZE);
                verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L),
                        argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
-               assertEquals(0L, buffer.getAlignmentDurationNanos());
+               assertEquals(0L, inputGate.getAlignmentDurationNanos());
        }
 
        @Test
@@ -1011,41 +1011,41 @@ public abstract class BarrierBufferTestBase {
                        /* 16 */ createBuffer(0), createBuffer(1), 
createBuffer(2)
                };
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
-               buffer = createBarrierBuffer(3, sequence, toNotify);
+               inputGate = createBarrierBuffer(3, sequence, toNotify);
 
                long startTs;
 
-               check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
 
                // starting first checkpoint
                startTs = System.nanoTime();
-               check(sequence[4], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[4], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
 
                // finished first checkpoint
-               check(sequence[3], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[3], inputGate.pollNext().get(), PAGE_SIZE);
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
-               validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
+               validateAlignmentTime(startTs, 
inputGate.getAlignmentDurationNanos());
 
-               check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
 
                // re-read the queued cancellation barriers
-               check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
                verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L),
                        argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
-               assertEquals(0L, buffer.getAlignmentDurationNanos());
+               assertEquals(0L, inputGate.getAlignmentDurationNanos());
 
-               check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[14], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[14], inputGate.pollNext().get(), PAGE_SIZE);
 
-               check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[17], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[17], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
 
                // no further alignment should have happened
-               assertEquals(0L, buffer.getAlignmentDurationNanos());
+               assertEquals(0L, inputGate.getAlignmentDurationNanos());
 
                // no further checkpoint (abort) notifications
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
@@ -1093,44 +1093,44 @@ public abstract class BarrierBufferTestBase {
                        /* 18 */ createBuffer(0), createBuffer(1), 
createBuffer(2)
                };
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
-               buffer = createBarrierBuffer(3, sequence, toNotify);
+               inputGate = createBarrierBuffer(3, sequence, toNotify);
 
                long startTs;
 
-               check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
 
                // starting first checkpoint
                startTs = System.nanoTime();
-               check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[3], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[2], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[3], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[6], inputGate.pollNext().get(), PAGE_SIZE);
 
                // cancelled by cancellation barrier
-               check(sequence[4], buffer.pollNext().get(), PAGE_SIZE);
-               validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
+               check(sequence[4], inputGate.pollNext().get(), PAGE_SIZE);
+               validateAlignmentTime(startTs, 
inputGate.getAlignmentDurationNanos());
                verify(toNotify).abortCheckpointOnBarrier(eq(1L),
                        argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
 
                // the next checkpoint alignment starts now
                startTs = System.nanoTime();
-               check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[11], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[11], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[15], inputGate.pollNext().get(), PAGE_SIZE);
 
                // checkpoint done
-               check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-               validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
+               check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
+               validateAlignmentTime(startTs, 
inputGate.getAlignmentDurationNanos());
                verify(toNotify).triggerCheckpointOnBarrier(argThat(new 
CheckpointMatcher(2L)), any(CheckpointOptions.class), 
any(CheckpointMetrics.class));
 
                // queued data
-               check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[14], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[10], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[14], inputGate.pollNext().get(), PAGE_SIZE);
 
                // trailing data
-               check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[20], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[19], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[20], inputGate.pollNext().get(), PAGE_SIZE);
 
                // check overall notifications
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
@@ -1168,40 +1168,40 @@ public abstract class BarrierBufferTestBase {
                        /* 16 */ createBuffer(0), createBuffer(1), 
createBuffer(2)
                };
                AbstractInvokable toNotify = mock(AbstractInvokable.class);
-               buffer = createBarrierBuffer(3, sequence, toNotify);
+               inputGate = createBarrierBuffer(3, sequence, toNotify);
 
                long startTs;
 
                // validate the sequence
 
-               check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[0], inputGate.pollNext().get(), PAGE_SIZE);
 
                // beginning of first checkpoint
-               check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[5], inputGate.pollNext().get(), PAGE_SIZE);
 
                // future barrier aborts checkpoint
                startTs = System.nanoTime();
-               check(sequence[3], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[3], inputGate.pollNext().get(), PAGE_SIZE);
                verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L),
                        argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)));
-               check(sequence[4], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[4], inputGate.pollNext().get(), PAGE_SIZE);
 
                // alignment of next checkpoint
-               check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[8], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[9], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[12], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[13], inputGate.pollNext().get(), PAGE_SIZE);
 
                // checkpoint finished
-               check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-               validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
+               check(sequence[7], inputGate.pollNext().get(), PAGE_SIZE);
+               validateAlignmentTime(startTs, 
inputGate.getAlignmentDurationNanos());
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
-               check(sequence[11], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[11], inputGate.pollNext().get(), PAGE_SIZE);
 
                // remaining data
-               check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[17], buffer.pollNext().get(), PAGE_SIZE);
-               check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
+               check(sequence[16], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[17], inputGate.pollNext().get(), PAGE_SIZE);
+               check(sequence[18], inputGate.pollNext().get(), PAGE_SIZE);
 
                // check overall notifications
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTrackerTest.java
similarity index 89%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
rename to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTrackerTest.java
index 5112f63..2218680 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTrackerTest.java
@@ -42,26 +42,26 @@ import static org.junit.Assert.assertTrue;
 /**
  * Tests for the behavior of the barrier tracker.
  */
-public class BarrierTrackerTest {
+public class CheckpointBarrierTrackerTest {
 
        private static final int PAGE_SIZE = 512;
 
-       private CheckpointedInputGate tracker;
+       private CheckpointedInputGate inputGate;
 
        @After
        public void ensureEmpty() throws Exception {
-               assertFalse(tracker.pollNext().isPresent());
-               assertTrue(tracker.isFinished());
-               assertTrue(tracker.isEmpty());
+               assertFalse(inputGate.pollNext().isPresent());
+               assertTrue(inputGate.isFinished());
+               assertTrue(inputGate.isEmpty());
        }
 
        @Test
        public void testSingleChannelNoBarriers() throws Exception {
                BufferOrEvent[] sequence = { createBuffer(0), createBuffer(0), 
createBuffer(0) };
-               tracker = createBarrierTracker(1, sequence);
+               inputGate = createBarrierTracker(1, sequence);
 
                for (BufferOrEvent boe : sequence) {
-                       assertEquals(boe, tracker.pollNext().get());
+                       assertEquals(boe, inputGate.pollNext().get());
                }
        }
 
@@ -71,10 +71,10 @@ public class BarrierTrackerTest {
                                createBuffer(1), createBuffer(0), 
createBuffer(3),
                                createBuffer(1), createBuffer(1), 
createBuffer(2)
                };
-               tracker = createBarrierTracker(4, sequence);
+               inputGate = createBarrierTracker(4, sequence);
 
                for (BufferOrEvent boe : sequence) {
-                       assertEquals(boe, tracker.pollNext().get());
+                       assertEquals(boe, inputGate.pollNext().get());
                }
        }
 
@@ -91,11 +91,11 @@ public class BarrierTrackerTest {
                };
                CheckpointSequenceValidator validator =
                        new CheckpointSequenceValidator(1, 2, 3, 4, 5, 6);
-               tracker = createBarrierTracker(1, sequence, validator);
+               inputGate = createBarrierTracker(1, sequence, validator);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer() || boe.getEvent().getClass() != 
CheckpointBarrier.class) {
-                               assertEquals(boe, tracker.pollNext().get());
+                               assertEquals(boe, inputGate.pollNext().get());
                        }
                }
        }
@@ -113,11 +113,11 @@ public class BarrierTrackerTest {
                };
                CheckpointSequenceValidator validator =
                        new CheckpointSequenceValidator(1, 3, 4, 6, 7, 10);
-               tracker = createBarrierTracker(1, sequence, validator);
+               inputGate = createBarrierTracker(1, sequence, validator);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer() || boe.getEvent().getClass() != 
CheckpointBarrier.class) {
-                               assertEquals(boe, tracker.pollNext().get());
+                               assertEquals(boe, inputGate.pollNext().get());
                        }
                }
        }
@@ -144,11 +144,11 @@ public class BarrierTrackerTest {
                };
                CheckpointSequenceValidator validator =
                        new CheckpointSequenceValidator(1, 2, 3, 4);
-               tracker = createBarrierTracker(3, sequence, validator);
+               inputGate = createBarrierTracker(3, sequence, validator);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer() || boe.getEvent().getClass() != 
CheckpointBarrier.class) {
-                               assertEquals(boe, tracker.pollNext().get());
+                               assertEquals(boe, inputGate.pollNext().get());
                        }
                }
        }
@@ -179,11 +179,11 @@ public class BarrierTrackerTest {
                };
                CheckpointSequenceValidator validator =
                        new CheckpointSequenceValidator(1, 2, 4);
-               tracker = createBarrierTracker(3, sequence, validator);
+               inputGate = createBarrierTracker(3, sequence, validator);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer() || boe.getEvent().getClass() != 
CheckpointBarrier.class) {
-                               assertEquals(boe, tracker.pollNext().get());
+                               assertEquals(boe, inputGate.pollNext().get());
                        }
                }
        }
@@ -253,11 +253,11 @@ public class BarrierTrackerTest {
                };
                CheckpointSequenceValidator validator =
                        new CheckpointSequenceValidator(2, 3, 4, 5, 7, 8, 9, 
10);
-               tracker = createBarrierTracker(3, sequence, validator);
+               inputGate = createBarrierTracker(3, sequence, validator);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer() || boe.getEvent().getClass() != 
CheckpointBarrier.class) {
-                               assertEquals(boe, tracker.pollNext().get());
+                               assertEquals(boe, inputGate.pollNext().get());
                        }
                }
        }
@@ -278,13 +278,13 @@ public class BarrierTrackerTest {
                // negative values mean an expected cancellation call!
                CheckpointSequenceValidator validator =
                        new CheckpointSequenceValidator(1, 2, -4, 5, -6);
-               tracker = createBarrierTracker(1, sequence, validator);
+               inputGate = createBarrierTracker(1, sequence, validator);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer()) {
-                               assertEquals(boe, tracker.pollNext().get());
+                               assertEquals(boe, inputGate.pollNext().get());
                        }
-                       assertTrue(tracker.isEmpty());
+                       assertTrue(inputGate.isEmpty());
                }
        }
 
@@ -327,11 +327,11 @@ public class BarrierTrackerTest {
                // negative values mean an expected cancellation call!
                CheckpointSequenceValidator validator =
                        new CheckpointSequenceValidator(1, -2, 3, -4, 5, -6);
-               tracker = createBarrierTracker(3, sequence, validator);
+               inputGate = createBarrierTracker(3, sequence, validator);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer()) {
-                               assertEquals(boe, tracker.pollNext().get());
+                               assertEquals(boe, inputGate.pollNext().get());
                        }
                }
        }
@@ -353,11 +353,11 @@ public class BarrierTrackerTest {
                };
                CheckpointSequenceValidator validator =
                        new CheckpointSequenceValidator(-1, -2);
-               tracker = createBarrierTracker(3, sequence, validator);
+               inputGate = createBarrierTracker(3, sequence, validator);
 
                for (BufferOrEvent boe : sequence) {
                        if (boe.isBuffer() || (boe.getEvent().getClass() != 
CheckpointBarrier.class && boe.getEvent().getClass() != 
CancelCheckpointMarker.class)) {
-                               assertEquals(boe, tracker.pollNext().get());
+                               assertEquals(boe, inputGate.pollNext().get());
                        }
                }
        }
@@ -374,7 +374,7 @@ public class BarrierTrackerTest {
                        BufferOrEvent[] sequence,
                        @Nullable AbstractInvokable toNotifyOnCheckpoint) {
                MockInputGate gate = new MockInputGate(PAGE_SIZE, 
numberOfChannels, Arrays.asList(sequence));
-               return new BarrierBuffer(
+               return new CheckpointedInputGate(
                        gate,
                        new CachedBufferStorage(PAGE_SIZE, -1, "Testing"),
                        new 
CheckpointBarrierTracker(gate.getNumberOfInputChannels(), 
toNotifyOnCheckpoint));
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedCheckpointBarrierAlignerTest.java
similarity index 79%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
rename to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedCheckpointBarrierAlignerTest.java
index 3db884d..5b942a6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedCheckpointBarrierAlignerTest.java
@@ -27,13 +27,13 @@ import javax.annotation.Nullable;
 import static org.junit.Assert.assertEquals;
 
 /**
- * Tests for the behaviors of the {@link BarrierBuffer} with {@link 
CachedBufferStorage}.
+ * Tests for the behaviors of the {@link CheckpointedInputGate} with {@link 
CachedBufferStorage}.
  */
-public class CreditBasedBarrierBufferTest extends BarrierBufferTestBase {
+public class CreditBasedCheckpointBarrierAlignerTest extends 
CheckpointBarrierAlignerTestBase {
 
        @Override
-       BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable 
AbstractInvokable toNotify) {
-               return new BarrierBuffer(gate, new 
CachedBufferStorage(PAGE_SIZE), "Testing", toNotify);
+       CheckpointedInputGate createBarrierBuffer(InputGate gate, @Nullable 
AbstractInvokable toNotify) {
+               return new CheckpointedInputGate(gate, new 
CachedBufferStorage(PAGE_SIZE), "Testing", toNotify);
        }
 
        @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
similarity index 85%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
rename to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
index f9541a9..c892073 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingCheckpointBarrierAlignerTest.java
@@ -36,9 +36,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for the behavior of the {@link BarrierBuffer} with {@link 
BufferSpiller}.
+ * Tests for the behavior of the {@link CheckpointedInputGate} with {@link 
BufferSpiller}.
  */
-public class SpillingBarrierBufferTest extends BarrierBufferTestBase {
+public class SpillingCheckpointBarrierAlignerTest extends 
CheckpointBarrierAlignerTestBase {
 
        private static IOManager ioManager;
 
@@ -67,8 +67,8 @@ public class SpillingBarrierBufferTest extends 
BarrierBufferTestBase {
        }
 
        @Override
-       BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable 
AbstractInvokable toNotify) throws IOException {
-               return new BarrierBuffer(gate, new BufferSpiller(ioManager, 
PAGE_SIZE), "Testing", toNotify);
+       CheckpointedInputGate createBarrierBuffer(InputGate gate, @Nullable 
AbstractInvokable toNotify) throws IOException {
+               return new CheckpointedInputGate(gate, new 
BufferSpiller(ioManager, PAGE_SIZE), "Testing", toNotify);
        }
 
        @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index 56c3889..456aea5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -31,7 +31,8 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
-import org.apache.flink.streaming.runtime.io.BarrierBufferTestBase;
+import org.apache.flink.streaming.runtime.io.CheckpointBarrierAlignerTestBase;
+import 
org.apache.flink.streaming.runtime.io.CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher;
 
 import org.junit.Test;
 
@@ -110,7 +111,7 @@ public class StreamTaskCancellationBarrierTest {
 
                // the decline call should go to the coordinator
                verify(environment, times(1)).declineCheckpoint(eq(2L),
-                       argThat(new 
BarrierBufferTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
+                       argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
 
                // a cancellation barrier should be downstream
                Object result = testHarness.getOutput().poll();
@@ -155,7 +156,7 @@ public class StreamTaskCancellationBarrierTest {
 
                // the decline call should go to the coordinator
                verify(environment, times(1)).declineCheckpoint(eq(2L),
-                       argThat(new 
BarrierBufferTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
+                       argThat(new 
CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
 
                // a cancellation barrier should be downstream
                Object result = testHarness.getOutput().poll();

Reply via email to