This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 26ca3f6d8af862f30d71ebcf9d0b22eab149ba85
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Dec 2 14:20:26 2020 +0100

    [hotfix][checkpointing] Explicit creation of CheckpointOptions
    
    The motivation is to eliminate subtle bugs when changing checkpoint
    type on the fly.
    1. Only guess options when creating a new barrier from configuration
    2. For other cases provide explicit factory methods
    2. Carry the current checkpoint/barrier requirements instead of the
    initial configuration.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |   2 +-
 .../runtime/checkpoint/CheckpointOptions.java      |  89 +++---
 .../runtime/io/network/api/CheckpointBarrier.java  |   2 +-
 .../checkpoint/CheckpointCoordinatorTest.java      |   5 +-
 .../runtime/checkpoint/CheckpointOptionsTest.java  |  53 +---
 .../api/serialization/EventSerializerTest.java     |   8 +-
 .../PipelinedSubpartitionWithReadViewTest.java     |  15 +-
 .../partition/consumer/LocalInputChannelTest.java  |   3 +-
 .../partition/consumer/RemoteInputChannelTest.java |  32 +--
 .../runtime/tasks/SourceOperatorStreamTask.java    |   6 +-
 .../streaming/runtime/tasks/SourceStreamTask.java  |   6 +-
 .../runtime/io/AlternatingControllerTest.java      | 303 +++++++++------------
 .../runtime/io/InputProcessorUtilTest.java         |   3 +-
 .../runtime/io/StreamTaskNetworkInputTest.java     |   2 +-
 .../io/UnalignedControllerCancellationTest.java    |   2 +-
 .../runtime/io/UnalignedControllerTest.java        |   8 +-
 ...tStreamTaskChainedSourcesCheckpointingTest.java |   2 +-
 .../tasks/SubtaskCheckpointCoordinatorTest.java    |   5 +-
 18 files changed, 249 insertions(+), 297 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9154989..27635c0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -778,7 +778,7 @@ public class CheckpointCoordinator {
                Execution[] executions,
                boolean advanceToEndOfTime) {
 
-               final CheckpointOptions checkpointOptions = 
CheckpointOptions.create(
+               final CheckpointOptions checkpointOptions = 
CheckpointOptions.forConfig(
                        props.getCheckpointType(),
                        checkpointStorageLocation.getLocationReference(),
                        isExactlyOnceMode,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index 1f12e7f..a788abd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -25,11 +25,13 @@ import 
org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import java.io.Serializable;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * Options for performing the checkpoint.
+ * Options for performing the checkpoint. Note that different
+ * {@link org.apache.flink.runtime.io.network.api.CheckpointBarrier barriers} 
may have different options.
  *
  * <p>The {@link CheckpointProperties} are related and cover properties that
  * are only relevant at the {@link CheckpointCoordinator}. These options are
@@ -53,19 +55,59 @@ public class CheckpointOptions implements Serializable {
 
        private final long alignmentTimeout;
 
-       public static CheckpointOptions create(
+       public static CheckpointOptions notExactlyOnce(CheckpointType type, 
CheckpointStorageLocationReference location) {
+               return new CheckpointOptions(
+                       type,
+                       location,
+                       false,
+                       false,
+                       NO_ALIGNMENT_TIME_OUT);
+       }
+
+       public static CheckpointOptions alignedNoTimeout(CheckpointType type, 
CheckpointStorageLocationReference location) {
+               return new CheckpointOptions(
+                       type,
+                       location,
+                       true,
+                       false,
+                       NO_ALIGNMENT_TIME_OUT);
+       }
+
+       public static CheckpointOptions 
unaligned(CheckpointStorageLocationReference location) {
+               return new CheckpointOptions(
+                       CheckpointType.CHECKPOINT,
+                       location,
+                       true,
+                       true,
+                       NO_ALIGNMENT_TIME_OUT);
+       }
+
+       public static CheckpointOptions 
alignedWithTimeout(CheckpointStorageLocationReference location, long 
alignmentTimeout) {
+               return new CheckpointOptions(
+                       CheckpointType.CHECKPOINT,
+                       location,
+                       true,
+                       false,
+                       alignmentTimeout);
+       }
+
+       public static CheckpointOptions forConfig(
                        CheckpointType checkpointType,
                        CheckpointStorageLocationReference locationReference,
                        boolean isExactlyOnceMode,
-                       boolean unalignedCheckpointsEnabled,
+                       boolean isUnalignedEnabled,
                        long alignmentTimeout) {
-               boolean canBeUnaligned = checkpointType == 
CheckpointType.CHECKPOINT && unalignedCheckpointsEnabled;
-               return new CheckpointOptions(
-                       checkpointType,
-                       locationReference,
-                       isExactlyOnceMode,
-                       canBeUnaligned && alignmentTimeout == 0,
-                       canBeUnaligned ? alignmentTimeout : 
NO_ALIGNMENT_TIME_OUT);
+               if (!isExactlyOnceMode) {
+                       return notExactlyOnce(checkpointType, 
locationReference);
+               } else if (checkpointType.isSavepoint()) {
+                       return alignedNoTimeout(checkpointType, 
locationReference);
+               } else if (!isUnalignedEnabled) {
+                       return alignedNoTimeout(checkpointType, 
locationReference);
+               } else if (alignmentTimeout == 0 || alignmentTimeout == 
NO_ALIGNMENT_TIME_OUT) {
+                       return unaligned(locationReference);
+               } else {
+                       return alignedWithTimeout(locationReference, 
alignmentTimeout);
+               }
        }
 
        @VisibleForTesting
@@ -82,6 +124,8 @@ public class CheckpointOptions implements Serializable {
                        boolean isUnalignedCheckpoint,
                        long alignmentTimeout) {
 
+               checkArgument(!isUnalignedCheckpoint || 
!checkpointType.isSavepoint(), "Savepoint can't be unaligned");
+               checkArgument(alignmentTimeout == NO_ALIGNMENT_TIME_OUT || 
!isUnalignedCheckpoint, "Unaligned checkpoint can't have timeout (%s)", 
alignmentTimeout);
                this.checkpointType = checkNotNull(checkpointType);
                this.targetLocation = checkNotNull(targetLocation);
                this.isExactlyOnceMode = isExactlyOnceMode;
@@ -98,7 +142,7 @@ public class CheckpointOptions implements Serializable {
        }
 
        public boolean isTimeoutable() {
-               return !isUnalignedCheckpoint && (alignmentTimeout > 0 && 
alignmentTimeout != NO_ALIGNMENT_TIME_OUT);
+               return isExactlyOnceMode && !isUnalignedCheckpoint && 
(alignmentTimeout > 0 && alignmentTimeout != NO_ALIGNMENT_TIME_OUT);
        }
 
        // 
------------------------------------------------------------------------
@@ -178,25 +222,8 @@ public class CheckpointOptions implements Serializable {
                return CHECKPOINT_AT_DEFAULT_LOCATION;
        }
 
-       public static CheckpointOptions forCheckpointWithDefaultLocation(
-                       boolean isExactlyOnceMode,
-                       boolean isUnalignedCheckpoint,
-                       long alignmentTimeout) {
-               return new CheckpointOptions(
-                       CheckpointType.CHECKPOINT,
-                       CheckpointStorageLocationReference.getDefault(),
-                       isExactlyOnceMode,
-                       isUnalignedCheckpoint,
-                       alignmentTimeout);
-       }
-
-       public CheckpointOptions asTimedOut() {
-               checkState(checkpointType == CheckpointType.CHECKPOINT);
-               return create(
-                       checkpointType,
-                       targetLocation,
-                       isExactlyOnceMode,
-                       true,
-                       0);
+       public CheckpointOptions toUnaligned() {
+               checkState(!isUnalignedCheckpoint);
+               return unaligned(targetLocation);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index e734616..e7e78e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -119,6 +119,6 @@ public class CheckpointBarrier extends RuntimeEvent {
        }
 
        public CheckpointBarrier asUnaligned() {
-               return checkpointOptions.isUnalignedCheckpoint() ? this : new 
CheckpointBarrier(getId(), getTimestamp(), getCheckpointOptions().asTimedOut());
+               return checkpointOptions.isUnalignedCheckpoint() ? this : new 
CheckpointBarrier(getId(), getTimestamp(), 
getCheckpointOptions().toUnaligned());
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 128b74c..d5b0851 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2620,7 +2620,10 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                return new CheckpointCoordinatorBuilder()
                        .setJobId(jobId)
                        .setTasks(new ExecutionVertex[]{ vertex1, vertex2 })
-                       
.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
+                       .setCheckpointCoordinatorConfiguration(
+                               CheckpointCoordinatorConfiguration.builder()
+                                       .setAlignmentTimeout(Long.MAX_VALUE)
+                                       
.setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
                        .setTimer(manuallyTriggeredScheduledExecutor)
                        .build();
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
index b443e8a..a1485ca 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
@@ -25,6 +25,7 @@ import org.junit.Test;
 
 import java.util.Random;
 
+import static 
org.apache.flink.runtime.checkpoint.CheckpointOptions.NO_ALIGNMENT_TIME_OUT;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
 import static org.junit.Assert.assertArrayEquals;
@@ -63,64 +64,40 @@ public class CheckpointOptionsTest {
                assertArrayEquals(locationBytes, 
copy.getTargetLocation().getReferenceBytes());
        }
 
-       @Test
+       @Test(expected = IllegalArgumentException.class)
        public void testSavepointNeedsAlignment() {
-               CheckpointStorageLocationReference location = 
CheckpointStorageLocationReference.getDefault();
-               assertTrue(new CheckpointOptions(SAVEPOINT, location, true, 
true, 0).needsAlignment());
-               assertFalse(new CheckpointOptions(SAVEPOINT, location, false, 
true, 0).needsAlignment());
-               assertTrue(new CheckpointOptions(SAVEPOINT, location, true, 
false, 0).needsAlignment());
-               assertFalse(new CheckpointOptions(SAVEPOINT, location, false, 
false, 0).needsAlignment());
+               new CheckpointOptions(SAVEPOINT, 
CheckpointStorageLocationReference.getDefault(), true, true, 0);
        }
 
        @Test
        public void testCheckpointNeedsAlignment() {
                CheckpointStorageLocationReference location = 
CheckpointStorageLocationReference.getDefault();
-               assertFalse(new CheckpointOptions(CHECKPOINT, location, true, 
true, 0).needsAlignment());
-               assertTrue(new CheckpointOptions(CHECKPOINT, location, true, 
false, 0).needsAlignment());
-               assertFalse(new CheckpointOptions(CHECKPOINT, location, false, 
true, 0).needsAlignment());
-               assertFalse(new CheckpointOptions(CHECKPOINT, location, false, 
false, 0).needsAlignment());
+               assertFalse(new CheckpointOptions(CHECKPOINT, location, true, 
true, Long.MAX_VALUE).needsAlignment());
+               assertTrue(new CheckpointOptions(CHECKPOINT, location, true, 
false, Long.MAX_VALUE).needsAlignment());
+               assertFalse(new CheckpointOptions(CHECKPOINT, location, false, 
true, Long.MAX_VALUE).needsAlignment());
+               assertFalse(new CheckpointOptions(CHECKPOINT, location, false, 
false, Long.MAX_VALUE).needsAlignment());
        }
 
        @Test
        public void testCheckpointIsTimeoutable() {
                CheckpointStorageLocationReference location = 
CheckpointStorageLocationReference.getDefault();
                assertTimeoutable(
-                       CheckpointOptions.create(
-                               CHECKPOINT,
-                               location,
-                               true,
-                               true,
-                               10),
+                       CheckpointOptions.alignedWithTimeout(location, 10),
                        false,
                        true,
                        10);
                assertTimeoutable(
-                       CheckpointOptions.create(
-                               CHECKPOINT,
-                               location,
-                               true,
-                               false,
-                               10),
-                       false,
-                       false,
-                       CheckpointOptions.NO_ALIGNMENT_TIME_OUT);
-               assertTimeoutable(
-                       CheckpointOptions.create(
-                               CHECKPOINT,
-                               location,
-                               true,
-                               true,
-                               0),
+                       CheckpointOptions.unaligned(location),
                        true,
                        false,
-                       0);
+                       NO_ALIGNMENT_TIME_OUT);
        }
 
        private void assertTimeoutable(CheckpointOptions options, boolean 
isUnaligned, boolean isTimeoutable, long timeout) {
-               assertTrue(options.isExactlyOnceMode());
-               assertEquals(!isUnaligned, options.needsAlignment());
-               assertEquals(isUnaligned, options.isUnalignedCheckpoint());
-               assertEquals(isTimeoutable, options.isTimeoutable());
-               assertEquals(timeout, options.getAlignmentTimeout());
+               assertTrue("exactly once", options.isExactlyOnceMode());
+               assertEquals("need alignment", !isUnaligned, 
options.needsAlignment());
+               assertEquals("unaligned", isUnaligned, 
options.isUnalignedCheckpoint());
+               assertEquals("timeoutable", isTimeoutable, 
options.isTimeoutable());
+               assertEquals("timeout", timeout, options.getAlignmentTimeout());
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index ee97672..06c375f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -55,12 +54,7 @@ public class EventSerializerTest {
                new EventAnnouncement(new CheckpointBarrier(
                        42L,
                        1337L,
-                       CheckpointOptions.create(
-                               CheckpointType.CHECKPOINT,
-                               CheckpointStorageLocationReference.getDefault(),
-                               true,
-                               true,
-                               10)),
+                       
CheckpointOptions.alignedWithTimeout(CheckpointStorageLocationReference.getDefault(),
 10)),
                        44)
        };
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
index 38cd22f..b2a4c23 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -338,12 +337,7 @@ public class PipelinedSubpartitionWithReadViewTest {
                assertEquals(1, availablityListener.getNumNotifications());
                assertEquals(0, availablityListener.getNumPriorityEvents());
 
-               CheckpointOptions options = new CheckpointOptions(
-                       CheckpointType.CHECKPOINT,
-                       new CheckpointStorageLocationReference(new byte[]{0, 1, 
2}),
-                       true,
-                       true,
-                       0);
+               CheckpointOptions options = CheckpointOptions.unaligned(new 
CheckpointStorageLocationReference(new byte[]{0, 1, 2}));
                channelStateWriter.start(0, options);
                BufferConsumer barrierBuffer = 
EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, options), true);
                subpartition.add(barrierBuffer);
@@ -366,12 +360,7 @@ public class PipelinedSubpartitionWithReadViewTest {
        public void testAvailabilityAfterPriority() throws Exception {
                subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
 
-               CheckpointOptions options = new CheckpointOptions(
-                       CheckpointType.CHECKPOINT,
-                       new CheckpointStorageLocationReference(new byte[]{0, 1, 
2}),
-                       true,
-                       true,
-                       0);
+               CheckpointOptions options = CheckpointOptions.unaligned(new 
CheckpointStorageLocationReference(new byte[]{0, 1, 2}));
                BufferConsumer barrierBuffer = 
EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, options), true);
                subpartition.add(barrierBuffer);
                assertEquals(1, availablityListener.getNumNotifications());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 8785a68..7b69780 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.CancelTaskException;
@@ -501,7 +500,7 @@ public class LocalInputChannelTest {
                channel.requestSubpartition(0);
 
                final CheckpointStorageLocationReference location = 
getDefault();
-               CheckpointOptions options = new 
CheckpointOptions(CheckpointType.CHECKPOINT, location, true, true, 0);
+               CheckpointOptions options = 
CheckpointOptions.unaligned(location);
                stateWriter.start(0, options);
 
                final CheckpointBarrier barrier = new CheckpointBarrier(0, 
123L, options);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 2954a3f..d13cb72 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -44,7 +44,6 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
@@ -83,6 +82,7 @@ import static 
org.apache.flink.runtime.io.network.partition.AvailabilityUtil.ass
 import static 
org.apache.flink.runtime.io.network.partition.AvailabilityUtil.assertPriorityAvailability;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static 
org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
+import static 
org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
@@ -107,7 +107,9 @@ import static org.mockito.Mockito.when;
  */
 public class RemoteInputChannelTest {
 
-       public static final long CHECKPOINT_ID = 1L;
+       private static final long CHECKPOINT_ID = 1L;
+       private static final CheckpointOptions UNALIGNED = 
CheckpointOptions.unaligned(getDefault());
+       private static final CheckpointOptions ALIGNED_WITH_TIMEOUT = 
CheckpointOptions.alignedWithTimeout(getDefault(), 10);
 
        @Test
        public void testExceptionOnReordering() throws Exception {
@@ -1136,7 +1138,7 @@ public class RemoteInputChannelTest {
                final RemoteInputChannel channel = 
buildInputGateAndGetChannel(sequenceNumber);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
-               sendBarrier(channel, sequenceNumber++, 0);
+               sendBarrier(channel, sequenceNumber++, UNALIGNED);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
 
@@ -1150,7 +1152,7 @@ public class RemoteInputChannelTest {
                final RemoteInputChannel channel = 
buildInputGateAndGetChannel(sequenceNumber);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
-               sendBarrier(channel, sequenceNumber++, 0);
+               sendBarrier(channel, sequenceNumber++, UNALIGNED);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                assertInflightBufferSizes(channel, 1, 2);
@@ -1176,7 +1178,7 @@ public class RemoteInputChannelTest {
                        int sequenceNumber = startingSequence;
                        sendBuffer(channel, sequenceNumber++, bufferSize++);
                        sendBuffer(channel, sequenceNumber++, bufferSize++);
-                       sendBarrier(channel, sequenceNumber++, 0);
+                       sendBarrier(channel, sequenceNumber++, UNALIGNED);
                        sendBuffer(channel, sequenceNumber++, bufferSize++);
                        sendBuffer(channel, sequenceNumber++, bufferSize++);
                        assertThat(
@@ -1193,7 +1195,7 @@ public class RemoteInputChannelTest {
                final RemoteInputChannel channel = 
buildInputGateAndGetChannel(sequenceNumber);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
-               sendBarrier(channel, sequenceNumber++, 0);
+               sendBarrier(channel, sequenceNumber++, UNALIGNED);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                assertGetNextBufferSequenceNumbers(channel, 2, 0);
@@ -1213,7 +1215,7 @@ public class RemoteInputChannelTest {
                final RemoteInputChannel channel = 
buildInputGateAndGetChannel(sequenceNumber);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
-               sendBarrier(channel, sequenceNumber++, 10);
+               sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
 
@@ -1239,7 +1241,7 @@ public class RemoteInputChannelTest {
                final RemoteInputChannel channel = 
buildInputGateAndGetChannel(sequenceNumber);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
-               sendBarrier(channel, sequenceNumber++, 10);
+               sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                assertInflightBufferSizes(channel, 1, 2);
@@ -1252,7 +1254,7 @@ public class RemoteInputChannelTest {
                final RemoteInputChannel channel = 
buildInputGateAndGetChannel(sequenceNumber);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
-               sendBarrier(channel, sequenceNumber++, 10);
+               sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                assertGetNextBufferSequenceNumbers(channel, 2);
@@ -1266,20 +1268,14 @@ public class RemoteInputChannelTest {
                final RemoteInputChannel channel = 
buildInputGateAndGetChannel(sequenceNumber);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
-               sendBarrier(channel, sequenceNumber++, 10);
+               sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                sendBuffer(channel, sequenceNumber++, bufferSize++);
                assertGetNextBufferSequenceNumbers(channel, 2, 0);
                assertInflightBufferSizes(channel, 2);
        }
 
-       private void sendBarrier(RemoteInputChannel channel, int 
sequenceNumber, int alignmentTimeout) throws IOException {
-               CheckpointOptions checkpointOptions = CheckpointOptions.create(
-                       CHECKPOINT,
-                       CheckpointStorageLocationReference.getDefault(),
-                       true,
-                       true,
-                       alignmentTimeout);
+       private void sendBarrier(RemoteInputChannel channel, int 
sequenceNumber, CheckpointOptions checkpointOptions) throws IOException {
                send(
                        channel,
                        sequenceNumber,
@@ -1438,7 +1434,7 @@ public class RemoteInputChannelTest {
                SingleInputGate inputGate = new 
SingleInputGateBuilder().build();
                RemoteInputChannel channel = 
InputChannelTestUtils.createRemoteInputChannel(inputGate, 0);
 
-               CheckpointOptions options = new CheckpointOptions(CHECKPOINT, 
CheckpointStorageLocationReference.getDefault());
+               CheckpointOptions options = new CheckpointOptions(CHECKPOINT, 
getDefault());
                assertPriorityAvailability(inputGate, false, false, () ->
                        assertAvailability(inputGate, false, true, () -> {
                                channel.onBuffer(toBuffer(new 
CheckpointBarrier(1L, 123L, options), false), 0, 0);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index 1f91062..ec20fdf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -23,7 +23,9 @@ import 
org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.SourceOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -131,7 +133,9 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
        // --------------------------
 
        private void triggerCheckpointForExternallyInducedSource(long 
checkpointId) {
-               final CheckpointOptions checkpointOptions = 
CheckpointOptions.forCheckpointWithDefaultLocation(
+               final CheckpointOptions checkpointOptions = 
CheckpointOptions.forConfig(
+                       CheckpointType.CHECKPOINT,
+                       CheckpointStorageLocationReference.getDefault(),
                        configuration.isExactlyOnceCheckpointMode(),
                        configuration.isUnalignedCheckpointsEnabled(),
                        configuration.getAlignmentTimeout());
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index afd67f3..5b10df5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -21,9 +21,11 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -90,7 +92,9 @@ public class SourceStreamTask<OUT, SRC extends 
SourceFunction<OUT>, OP extends S
                                        // TODO - we need to see how to derive 
those. We should probably not encode this in the
                                        // TODO -   source's trigger message, 
but do a handshake in this task between the trigger
                                        // TODO -   message from the master, 
and the source's trigger notification
-                                       final CheckpointOptions 
checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(
+                                       final CheckpointOptions 
checkpointOptions = CheckpointOptions.forConfig(
+                                               CheckpointType.CHECKPOINT,
+                                               
CheckpointStorageLocationReference.getDefault(),
                                                
configuration.isExactlyOnceCheckpointMode(),
                                                
configuration.isUnalignedCheckpointsEnabled(),
                                                
configuration.getAlignmentTimeout());
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
index d998649..a7be650 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
@@ -23,20 +23,19 @@ import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.event.RuntimeEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EventAnnouncement;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
-import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
 import 
org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
@@ -50,15 +49,18 @@ import java.util.Optional;
 
 import static java.util.Collections.singletonList;
 import static junit.framework.TestCase.assertTrue;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointOptions.alignedNoTimeout;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointOptions.alignedWithTimeout;
+import static org.apache.flink.runtime.checkpoint.CheckpointOptions.unaligned;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
 import static 
org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer;
+import static 
org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
+import static 
org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -146,7 +148,7 @@ public class AlternatingControllerTest {
                        barriers.add(barrier);
                        CheckpointType type = barrier % 2 == 0 ? CHECKPOINT : 
SAVEPOINT;
                        for (int channel = 0; channel < numChannels; channel++) 
{
-                               sendBarrier(barrier, type, (TestInputChannel) 
gate.getChannel(channel), gate);
+                               send(barrier(barrier, 
System.currentTimeMillis(), alignedNoTimeout(type, 
getDefault())).retainBuffer(), channel, gate);
                        }
                }
                assertEquals(barriers, target.triggeredCheckpoints);
@@ -155,28 +157,22 @@ public class AlternatingControllerTest {
        @Test
        public void testAlignedNeverTimeoutableCheckpoint() throws Exception {
                int numChannels = 2;
-               int bufferSize = 1000;
                ValidatingCheckpointHandler target = new 
ValidatingCheckpointHandler();
                CheckpointedInputGate gate = buildGate(target, numChannels);
 
-               long checkpointCreationTime = System.currentTimeMillis();
-               // Aligned checkpoint that never times out
-               Buffer neverTimeoutableCheckpoint = barrier(1, CHECKPOINT, 
checkpointCreationTime, Long.MAX_VALUE);
-               send(neverTimeoutableCheckpoint, gate, 0);
-               sendBuffer(bufferSize, gate, 1);
+               Buffer neverTimeoutableCheckpoint = 
withTimeout(Integer.MAX_VALUE);
+               send(neverTimeoutableCheckpoint, 0, gate);
+               sendData(1000, 1, gate);
                assertEquals(0, target.getTriggeredCheckpointCounter());
 
-               send(neverTimeoutableCheckpoint, gate, 1);
+               send(neverTimeoutableCheckpoint, 1, gate);
                assertEquals(1, target.getTriggeredCheckpointCounter());
        }
 
        @Test
        public void testTimeoutAlignment() throws Exception {
-               int numChannels = 2;
                ValidatingCheckpointHandler target = new 
ValidatingCheckpointHandler();
-               CheckpointedInputGate gate = buildRemoteInputGate(target, 
numChannels);
-
-               testTimeoutBarrierOnTwoChannels(target, gate);
+               testTimeoutBarrierOnTwoChannels(target, 
buildRemoteInputGate(target, 2));
        }
 
        @Test
@@ -185,15 +181,7 @@ public class AlternatingControllerTest {
                ValidatingCheckpointHandler target = new 
ValidatingCheckpointHandler();
                CheckpointedInputGate gate = buildRemoteInputGate(target, 
numChannels);
 
-               long alignmentTimeout = 10;
-               long checkpointCreationTime = System.currentTimeMillis() - 2 * 
alignmentTimeout;
-               Buffer neverTimeoutableCheckpoint = barrier(1, CHECKPOINT, 
checkpointCreationTime, Long.MAX_VALUE);
-
-               RemoteInputChannel channel2 = (RemoteInputChannel) 
gate.getChannel(2);
-
-               channel2.onBuffer(neverTimeoutableCheckpoint.retainBuffer(), 0, 
0);
-               while (gate.pollNext().isPresent()) {
-               }
+               send(barrier(1, System.currentTimeMillis(), 
alignedWithTimeout(getDefault(), Integer.MAX_VALUE)), 2, gate);
 
                assertEquals(0, target.getTriggeredCheckpointCounter());
 
@@ -201,46 +189,31 @@ public class AlternatingControllerTest {
        }
 
        private void 
testTimeoutBarrierOnTwoChannels(ValidatingCheckpointHandler target, 
CheckpointedInputGate gate) throws Exception {
-               int bufferSize = 1000;
                long alignmentTimeout = 10;
-               long checkpointCreationTime = System.currentTimeMillis() - 2 * 
alignmentTimeout;
-               Buffer checkpointBarrier = barrier(1, CHECKPOINT, 
checkpointCreationTime, alignmentTimeout);
-               Buffer buffer = TestBufferFactory.createBuffer(bufferSize);
-
-               RemoteInputChannel channel0 = (RemoteInputChannel) 
gate.getChannel(0);
-               RemoteInputChannel channel1 = (RemoteInputChannel) 
gate.getChannel(1);
-               channel0.onBuffer(buffer.retainBuffer(), 0, 0);
-               channel0.onBuffer(buffer.retainBuffer(), 1, 0);
-               channel0.onBuffer(checkpointBarrier.retainBuffer(), 2, 0);
-               channel1.onBuffer(buffer.retainBuffer(), 0, 0);
-               channel1.onBuffer(checkpointBarrier.retainBuffer(), 1, 0);
+               Buffer checkpointBarrier = withTimeout(alignmentTimeout);
+
+               getChannel(gate, 0).onBuffer(dataBuffer(), 0, 0);
+               getChannel(gate, 0).onBuffer(dataBuffer(), 1, 0);
+               getChannel(gate, 0).onBuffer(checkpointBarrier.retainBuffer(), 
2, 0);
+               getChannel(gate, 1).onBuffer(dataBuffer(), 0, 0);
+               getChannel(gate, 1).onBuffer(checkpointBarrier.retainBuffer(), 
1, 0);
 
                assertEquals(0, target.getTriggeredCheckpointCounter());
-               // First announcements and prioritsed barriers
-               List<AbstractEvent> events = new ArrayList<>();
-               events.add(gate.pollNext().get().getEvent());
+               assertAnnouncement(gate);
                Thread.sleep(alignmentTimeout * 2);
-               events.add(gate.pollNext().get().getEvent());
-               events.add(gate.pollNext().get().getEvent());
-               events.add(gate.pollNext().get().getEvent());
-               assertThat(events, containsInAnyOrder(
-                       instanceOf(EventAnnouncement.class),
-                       instanceOf(EventAnnouncement.class),
-                       instanceOf(CheckpointBarrier.class),
-                       instanceOf(CheckpointBarrier.class)));
+               assertAnnouncement(gate);
+               assertBarrier(gate);
+               assertBarrier(gate);
                assertEquals(1, target.getTriggeredCheckpointCounter());
-               assertThat(
-                       target.getTriggeredCheckpointOptions(),
-                       contains(CheckpointOptions.create(
-                               CHECKPOINT,
-                               CheckpointStorageLocationReference.getDefault(),
-                               true,
-                               true,
-                               0)));
+               assertThat(target.getTriggeredCheckpointOptions(), 
contains(unaligned(getDefault())));
                // Followed by overtaken buffers
-               assertFalse(gate.pollNext().get().isEvent());
-               assertFalse(gate.pollNext().get().isEvent());
-               assertFalse(gate.pollNext().get().isEvent());
+               assertData(gate);
+               assertData(gate);
+               assertData(gate);
+       }
+
+       private Buffer dataBuffer() {
+               return createBuffer(100).retainBuffer();
        }
 
        /**
@@ -254,27 +227,21 @@ public class AlternatingControllerTest {
                CheckpointedInputGate gate = buildRemoteInputGate(target, 
numChannels);
 
                long alignmentTimeout = 100;
-               long checkpointCreationTime = System.currentTimeMillis();
-               Buffer checkpointBarrier = barrier(1, CHECKPOINT, 
checkpointCreationTime, alignmentTimeout);
+               Buffer checkpointBarrier = withTimeout(alignmentTimeout);
 
-               RemoteInputChannel channel0 = (RemoteInputChannel) 
gate.getChannel(0);
-               RemoteInputChannel channel1 = (RemoteInputChannel) 
gate.getChannel(1);
-               channel0.onBuffer(checkpointBarrier.retainBuffer(), 0, 0);
-               channel1.onBuffer(checkpointBarrier.retainBuffer(), 0, 0);
+               for (int i = 0; i < numChannels; i++) {
+                       (getChannel(gate, 
i)).onBuffer(checkpointBarrier.retainBuffer(), 0, 0);
+               }
 
                assertEquals(0, target.getTriggeredCheckpointCounter());
-               // First announcements and prioritsed barriers
-               List<AbstractEvent> events = new ArrayList<>();
-               events.add(gate.pollNext().get().getEvent());
-               events.add(gate.pollNext().get().getEvent());
+               for (int i = 0; i < numChannels; i++) {
+                       assertAnnouncement(gate);
+               }
+               assertEquals(0, target.getTriggeredCheckpointCounter());
 
-               Thread.sleep(alignmentTimeout * 2);
+               Thread.sleep(alignmentTimeout * 4);
 
-               events.add(gate.pollNext().get().getEvent());
-               assertThat(events, contains(
-                       instanceOf(EventAnnouncement.class),
-                       instanceOf(EventAnnouncement.class),
-                       instanceOf(CheckpointBarrier.class)));
+               assertBarrier(gate);
                assertEquals(1, target.getTriggeredCheckpointCounter());
        }
 
@@ -284,43 +251,29 @@ public class AlternatingControllerTest {
         */
        @Test
        public void testTimeoutAlignmentOnUnalignedCheckpoint() throws 
Exception {
-               int numChannels = 2;
                ValidatingCheckpointHandler target = new 
ValidatingCheckpointHandler();
                RecordingChannelStateWriter channelStateWriter = new 
RecordingChannelStateWriter();
-               CheckpointedInputGate gate = buildRemoteInputGate(target, 
numChannels, channelStateWriter);
-
-               long checkpointCreationTime = System.currentTimeMillis();
-               Buffer alignedCheckpointBarrier = barrier(1, CHECKPOINT, 
checkpointCreationTime, Integer.MAX_VALUE);
-               Buffer unalignedCheckpointBarrier = barrier(1, CHECKPOINT, 
checkpointCreationTime, 0);
-               Buffer buffer = TestBufferFactory.createBuffer(1000);
+               CheckpointedInputGate gate = buildRemoteInputGate(target, 2, 
channelStateWriter);
 
-               RemoteInputChannel channel0 = (RemoteInputChannel) 
gate.getChannel(0);
-               RemoteInputChannel channel1 = (RemoteInputChannel) 
gate.getChannel(1);
-               channel0.onBuffer(alignedCheckpointBarrier.retainBuffer(), 0, 
0);
+               getChannel(gate, 
0).onBuffer(withTimeout(Integer.MAX_VALUE).retainBuffer(), 0, 0);
 
-               List<AbstractEvent> events = new ArrayList<>();
-               events.add(gate.pollNext().get().getEvent());
-               events.add(gate.pollNext().get().getEvent());
+               assertAnnouncement(gate);
+               assertBarrier(gate);
 
-               assertThat(events, contains(
-                       instanceOf(EventAnnouncement.class),
-                       instanceOf(CheckpointBarrier.class)));
+               getChannel(gate, 1).onBuffer(dataBuffer(), 0, 0);
+               getChannel(gate, 1).onBuffer(dataBuffer(), 1, 0);
+               getChannel(gate, 1).onBuffer(toBuffer(new CheckpointBarrier(1, 
System.currentTimeMillis(), unaligned(getDefault())), true).retainBuffer(), 2, 
0);
 
-               channel1.onBuffer(buffer.retainBuffer(), 0, 0);
-               channel1.onBuffer(buffer.retainBuffer(), 1, 0);
-               channel1.onBuffer(unalignedCheckpointBarrier.retainBuffer(), 2, 
0);
+               assertBarrier(gate);
 
-               events.add(gate.pollNext().get().getEvent());
-
-               assertThat(events, contains(
-                       instanceOf(EventAnnouncement.class),
-                       instanceOf(CheckpointBarrier.class),
-                       instanceOf(CheckpointBarrier.class)));
-
-               
assertEquals(channelStateWriter.getAddedInput().get(channel1.getChannelInfo()).size(),
 2);
+               
assertEquals(channelStateWriter.getAddedInput().get(getChannel(gate, 
1).getChannelInfo()).size(), 2);
                assertEquals(1, target.getTriggeredCheckpointCounter());
        }
 
+       private RemoteInputChannel getChannel(CheckpointedInputGate gate, int 
channelIndex) {
+               return (RemoteInputChannel) gate.getChannel(channelIndex);
+       }
+
        @Test
        public void testTimeoutAlignmentConsistencyOnPreProcessBarrier() throws 
Exception {
                testTimeoutAlignmentConsistency(true, false, false);
@@ -352,9 +305,8 @@ public class AlternatingControllerTest {
                                new 
TestSubtaskCheckpointCoordinator(channelStateWriter),
                                gate));
 
-               long checkpointCreationTime = System.currentTimeMillis();
                long alignmentTimeout = 10;
-               CheckpointBarrier barrier = checkpointBarrier(1, CHECKPOINT, 
checkpointCreationTime, alignmentTimeout);
+               CheckpointBarrier barrier = new CheckpointBarrier(1, 
System.currentTimeMillis(), alignedNoTimeout(CHECKPOINT, getDefault()));
 
                InputChannelInfo channelInfo = channel0.getChannelInfo();
 
@@ -417,12 +369,12 @@ public class AlternatingControllerTest {
                long startNanos = System.nanoTime();
                long checkpoint1CreationTime = System.currentTimeMillis() - 10;
                sendBarrier(1, checkpoint1CreationTime, CHECKPOINT, gate, 0);
-               sendBuffer(bufferSize, gate, 0);
-               sendBuffer(bufferSize, gate, 1);
+               sendData(bufferSize, 0, gate);
+               sendData(bufferSize, 1, gate);
 
                Thread.sleep(6);
                sendBarrier(1, checkpoint1CreationTime, CHECKPOINT, gate, 1);
-               sendBuffer(bufferSize, gate, 0);
+               sendData(bufferSize, 0, gate);
 
                assertMetrics(
                        target,
@@ -436,7 +388,7 @@ public class AlternatingControllerTest {
                startNanos = System.nanoTime();
                long checkpoint2CreationTime = System.currentTimeMillis() - 5;
                sendBarrier(2, checkpoint2CreationTime, SAVEPOINT, gate, 0);
-               sendBuffer(bufferSize, gate, 1);
+               sendData(bufferSize, 1, gate);
 
                assertMetrics(
                        target,
@@ -448,7 +400,7 @@ public class AlternatingControllerTest {
                        bufferSize * 2);
                Thread.sleep(5);
                sendBarrier(2, checkpoint2CreationTime, SAVEPOINT, gate, 1);
-               sendBuffer(bufferSize, gate, 0);
+               sendData(bufferSize, 0, gate);
 
                assertMetrics(
                        target,
@@ -461,9 +413,9 @@ public class AlternatingControllerTest {
 
                startNanos = System.nanoTime();
                long checkpoint3CreationTime = System.currentTimeMillis() - 7;
-               sendBarrier(3, checkpoint3CreationTime, CHECKPOINT, gate, 0);
-               sendBuffer(bufferSize, gate, 0);
-               sendBuffer(bufferSize, gate, 1);
+               send(barrier(3, checkpoint3CreationTime, 
unaligned(getDefault())), 0, gate);
+               sendData(bufferSize, 0, gate);
+               sendData(bufferSize, 1, gate);
                assertMetrics(
                        target,
                        gate.getCheckpointBarrierHandler(),
@@ -473,7 +425,7 @@ public class AlternatingControllerTest {
                        7_000_000L,
                        -1L);
                Thread.sleep(10);
-               sendBarrier(3, checkpoint2CreationTime, CHECKPOINT, gate, 1);
+               send(barrier(3, checkpoint2CreationTime, 
unaligned(getDefault())), 1, gate);
                assertMetrics(
                        target,
                        gate.getCheckpointBarrierHandler(),
@@ -487,16 +439,15 @@ public class AlternatingControllerTest {
        @Test
        public void testMetricsSingleChannel() throws Exception {
                int numChannels = 1;
-               int bufferSize = 1000;
                ValidatingCheckpointHandler target = new 
ValidatingCheckpointHandler();
                CheckpointedInputGate gate = buildGate(target, numChannels);
 
                long checkpoint1CreationTime = System.currentTimeMillis() - 10;
                long startNanos = System.nanoTime();
 
-               sendBuffer(bufferSize, gate, 0);
+               sendData(1000, 0, gate);
                sendBarrier(1, checkpoint1CreationTime, CHECKPOINT, gate, 0);
-               sendBuffer(bufferSize, gate, 0);
+               sendData(1000, 0, gate);
                Thread.sleep(6);
                assertMetrics(
                        target,
@@ -509,9 +460,9 @@ public class AlternatingControllerTest {
 
                long checkpoint2CreationTime = System.currentTimeMillis() - 5;
                startNanos = System.nanoTime();
-               sendBuffer(bufferSize, gate, 0);
+               sendData(1000, 0, gate);
                sendBarrier(2, checkpoint2CreationTime, SAVEPOINT, gate, 0);
-               sendBuffer(bufferSize, gate, 0);
+               sendData(1000, 0, gate);
                Thread.sleep(5);
                assertMetrics(
                        target,
@@ -561,7 +512,7 @@ public class AlternatingControllerTest {
                        if (type.isSavepoint()) {
                                channels[channel].setBlocked(true);
                        }
-                       barrierHandler.processBarrier(new CheckpointBarrier(i, 
System.currentTimeMillis(), new CheckpointOptions(type, 
CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 
channel));
+                       barrierHandler.processBarrier(new CheckpointBarrier(i, 
System.currentTimeMillis(), new CheckpointOptions(type, getDefault())), new 
InputChannelInfo(0, channel));
                        if (type.isSavepoint()) {
                                assertTrue(channels[channel].isBlocked());
                                assertFalse(channels[(channel + 1) % 
2].isBlocked());
@@ -587,7 +538,7 @@ public class AlternatingControllerTest {
                SingleCheckpointBarrierHandler barrierHandler = 
barrierHandler(inputGate, target);
 
                final long id = 1;
-               barrierHandler.processBarrier(new CheckpointBarrier(id, 
System.currentTimeMillis(), new CheckpointOptions(CHECKPOINT, 
CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 0));
+               barrierHandler.processBarrier(new CheckpointBarrier(id, 
System.currentTimeMillis(), new CheckpointOptions(CHECKPOINT, getDefault())), 
new InputChannelInfo(0, 0));
 
                
assertFalse(barrierHandler.getAllBarriersReceivedFuture(id).isDone());
        }
@@ -604,9 +555,9 @@ public class AlternatingControllerTest {
                long checkpointId = 10;
                long outOfOrderSavepointId = 5;
 
-               barrierHandler.processBarrier(new 
CheckpointBarrier(checkpointId, System.currentTimeMillis(), new 
CheckpointOptions(CHECKPOINT, 
CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 0));
+               barrierHandler.processBarrier(new 
CheckpointBarrier(checkpointId, System.currentTimeMillis(), new 
CheckpointOptions(CHECKPOINT, getDefault())), new InputChannelInfo(0, 0));
                secondChannel.setBlocked(true);
-               barrierHandler.processBarrier(new 
CheckpointBarrier(outOfOrderSavepointId, System.currentTimeMillis(), new 
CheckpointOptions(SAVEPOINT, CheckpointStorageLocationReference.getDefault())), 
new InputChannelInfo(0, 1));
+               barrierHandler.processBarrier(new 
CheckpointBarrier(outOfOrderSavepointId, System.currentTimeMillis(), new 
CheckpointOptions(SAVEPOINT, getDefault())), new InputChannelInfo(0, 1));
 
                assertEquals(checkpointId, 
barrierHandler.getLatestCheckpointId());
                assertFalse(secondChannel.isBlocked());
@@ -627,9 +578,11 @@ public class AlternatingControllerTest {
                        slow.setBlocked(true);
                }
 
-               sendBarrier(barrierId, checkpointType, fast, checkpointedGate);
+               CheckpointOptions options = checkpointType.isSavepoint() ? 
alignedNoTimeout(checkpointType, getDefault()) : unaligned(getDefault());
+               Buffer barrier = barrier(barrierId, 1, options);
+               send(barrier.retainBuffer(), fast, checkpointedGate);
                assertEquals(checkpointType.isSavepoint(), 
target.triggeredCheckpoints.isEmpty());
-               sendBarrier(barrierId, checkpointType, slow, checkpointedGate);
+               send(barrier.retainBuffer(), slow, checkpointedGate);
 
                assertEquals(singletonList(barrierId), 
target.triggeredCheckpoints);
                if (checkpointType.isSavepoint()) {
@@ -642,29 +595,43 @@ public class AlternatingControllerTest {
        }
 
        private void sendBarrier(long barrierId, long barrierCreationTime, 
CheckpointType type, CheckpointedInputGate gate, int channelId) throws 
Exception {
-               send(barrier(barrierId, type, barrierCreationTime), gate, 
channelId);
+               send(barrier(barrierId, barrierCreationTime, 
alignedNoTimeout(type, getDefault())), channelId, gate);
        }
 
-       private void send(Buffer buffer, CheckpointedInputGate gate, int 
channelId) throws Exception {
-               TestInputChannel channel = (TestInputChannel) 
gate.getChannel(channelId);
-               channel.read(buffer.retainBuffer());
-               while (gate.pollNext().isPresent()) {
-               }
+       private void sendData(int dataSize, int channelId, 
CheckpointedInputGate gate) throws Exception {
+               send(createBuffer(dataSize), channelId, gate);
        }
 
-       private void sendBarrier(long barrierId, CheckpointType type, 
TestInputChannel channel, CheckpointedInputGate gate) throws Exception {
-               channel.read(barrier(barrierId, type).retainBuffer());
-               while (gate.pollNext().isPresent()) {
-               }
+       private void send(Buffer buffer, int channelId, CheckpointedInputGate 
gate) throws Exception {
+               send(buffer.retainBuffer(), gate.getChannel(channelId), gate);
        }
 
-       private void sendBuffer(int bufferSize, CheckpointedInputGate gate, int 
channelId) throws Exception {
-               TestInputChannel channel = (TestInputChannel) 
gate.getChannel(channelId);
-               channel.read(TestBufferFactory.createBuffer(bufferSize));
-               while (gate.pollNext().isPresent()) {
+       private void send(Buffer buffer, InputChannel channel, 
CheckpointedInputGate checkpointedGate) throws IOException, 
InterruptedException {
+               if (channel instanceof TestInputChannel) {
+                       ((TestInputChannel) channel).read(buffer);
+               } else if (channel instanceof RemoteInputChannel) {
+                       ((RemoteInputChannel) channel).onBuffer(buffer, 0, 0);
+               } else {
+                       throw new IllegalArgumentException("Unknown channel 
type: " + channel);
+               }
+               while (checkpointedGate.pollNext().isPresent()) {
                }
        }
 
+       private Buffer withTimeout(long alignmentTimeout) throws IOException {
+               return barrier(1, System.currentTimeMillis(), 
alignedWithTimeout(getDefault(), alignmentTimeout));
+       }
+
+       private Buffer barrier(long barrierId, long barrierTimestamp, 
CheckpointOptions options) throws IOException {
+               CheckpointBarrier checkpointBarrier = new CheckpointBarrier(
+                       barrierId,
+                       barrierTimestamp,
+                       options);
+               return toBuffer(
+                       checkpointBarrier,
+                       
checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint());
+       }
+
        private static SingleCheckpointBarrierHandler 
barrierHandler(SingleInputGate inputGate, AbstractInvokable target) {
                return barrierHandler(inputGate, target, new 
RecordingChannelStateWriter());
        }
@@ -683,38 +650,6 @@ public class AlternatingControllerTest {
                                new UnalignedController(new 
TestSubtaskCheckpointCoordinator(stateWriter), inputGate)));
        }
 
-       private Buffer barrier(long barrierId, CheckpointType checkpointType) 
throws IOException {
-               return barrier(barrierId, checkpointType, 
System.currentTimeMillis());
-       }
-
-       private Buffer barrier(long barrierId, CheckpointType checkpointType, 
long barrierTimestamp) throws IOException {
-               return barrier(barrierId, checkpointType, barrierTimestamp, 0);
-       }
-
-       private Buffer barrier(long barrierId, CheckpointType checkpointType, 
long barrierTimestamp, long alignmentTimeout) throws IOException {
-               CheckpointBarrier checkpointBarrier = checkpointBarrier(
-                       barrierId,
-                       checkpointType,
-                       barrierTimestamp,
-                       alignmentTimeout);
-               return toBuffer(
-                       checkpointBarrier,
-                       
checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint());
-       }
-
-       private CheckpointBarrier checkpointBarrier(long barrierId, 
CheckpointType checkpointType, long barrierTimestamp, long alignmentTimeout) {
-               CheckpointOptions options = CheckpointOptions.create(
-                       checkpointType,
-                       CheckpointStorageLocationReference.getDefault(),
-                       true,
-                       true,
-                       alignmentTimeout);
-               return new CheckpointBarrier(
-                       barrierId,
-                       barrierTimestamp,
-                       options);
-       }
-
        private static CheckpointedInputGate buildGate(AbstractInvokable 
target, int numChannels) {
                SingleInputGate gate = new 
SingleInputGateBuilder().setNumberOfChannels(numChannels).build();
                TestInputChannel[] channels = new TestInputChannel[numChannels];
@@ -751,4 +686,30 @@ public class AlternatingControllerTest {
                MailboxProcessor mailboxProcessor = new MailboxProcessor();
                return new CheckpointedInputGate(gate, barrierHandler(gate, 
target, channelStateWriter), mailboxProcessor.getMainMailboxExecutor());
        }
+
+       private static void assertAnnouncement(CheckpointedInputGate gate) 
throws IOException, InterruptedException {
+               assertEvent(gate, EventAnnouncement.class);
+       }
+
+       private static void assertBarrier(CheckpointedInputGate gate) throws 
IOException, InterruptedException {
+               assertEvent(gate, CheckpointBarrier.class);
+       }
+
+       private static <T extends RuntimeEvent> void 
assertEvent(CheckpointedInputGate gate, Class<T> clazz) throws IOException, 
InterruptedException {
+               Optional<BufferOrEvent> bufferOrEvent = assertPoll(gate);
+               assertTrue("expected event, got data buffer on " + 
bufferOrEvent.get().getChannelInfo(), bufferOrEvent.get().isEvent());
+               assertEquals(clazz, bufferOrEvent.get().getEvent().getClass());
+       }
+
+       private static <T extends RuntimeEvent> void 
assertData(CheckpointedInputGate gate) throws IOException, InterruptedException 
{
+               Optional<BufferOrEvent> bufferOrEvent = assertPoll(gate);
+               assertTrue("expected data, got " + 
bufferOrEvent.get().getEvent() + "  on " + 
bufferOrEvent.get().getChannelInfo(), bufferOrEvent.get().isBuffer());
+       }
+
+       private static Optional<BufferOrEvent> assertPoll(CheckpointedInputGate 
gate) throws IOException, InterruptedException {
+               Optional<BufferOrEvent> bufferOrEvent = gate.pollNext();
+               assertTrue("empty gate", bufferOrEvent.isPresent());
+               return bufferOrEvent;
+       }
+
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java
index 6f449ef..7d401d5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
@@ -84,7 +85,7 @@ public class InputProcessorUtilTest {
                        for (IndexedInputGate inputGate : allInputGates) {
                                for (int channelId = 0; channelId < 
inputGate.getNumberOfInputChannels(); channelId++) {
                                        barrierHandler.processBarrier(
-                                               new CheckpointBarrier(1, 42, 
CheckpointOptions.forCheckpointWithDefaultLocation(true, true, 0)),
+                                               new CheckpointBarrier(1, 42, 
CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault())),
                                                new 
InputChannelInfo(inputGate.getGateIndex(), channelId));
                                }
                        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index e916329..319c4f9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -145,7 +145,7 @@ public class StreamTaskNetworkInputTest {
                        deserializers);
 
                inputGate.sendEvent(
-                       new CheckpointBarrier(checkpointId, 0L, 
CheckpointOptions.forCheckpointWithDefaultLocation().asTimedOut()),
+                       new CheckpointBarrier(checkpointId, 0L, 
CheckpointOptions.forCheckpointWithDefaultLocation().toUnaligned()),
                        channelId);
                inputGate.sendElement(new StreamRecord<>(42L), channelId);
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
index 82ac3fe..1291457 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java
@@ -102,7 +102,7 @@ public class UnalignedControllerCancellationTest {
        }
 
        private static CheckpointBarrier checkpoint(int checkpointId) {
-               return new CheckpointBarrier(checkpointId, 1, 
CheckpointOptions.forCheckpointWithDefaultLocation().asTimedOut());
+               return new CheckpointBarrier(checkpointId, 1, 
CheckpointOptions.forCheckpointWithDefaultLocation().toUnaligned());
        }
 
        private static CancelCheckpointMarker cancel(int checkpointId) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java
index 7604746..7b64ee7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
@@ -39,7 +38,6 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBui
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import 
org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
@@ -59,8 +57,6 @@ import java.util.Random;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static 
org.apache.flink.runtime.checkpoint.CheckpointOptions.NO_ALIGNMENT_TIME_OUT;
-import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static 
org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -620,7 +616,7 @@ public class UnalignedControllerTest {
                        new CheckpointBarrier(
                                checkpointId,
                                timestamp,
-                               new CheckpointOptions(CHECKPOINT, getDefault(), 
true, true, NO_ALIGNMENT_TIME_OUT)),
+                               CheckpointOptions.unaligned(getDefault())),
                        new InputChannelInfo(0, channel));
        }
 
@@ -734,7 +730,7 @@ public class UnalignedControllerTest {
        }
 
        private CheckpointBarrier buildCheckpointBarrier(long id) {
-               return new CheckpointBarrier(id, 0, new 
CheckpointOptions(CHECKPOINT, getDefault(), true, true, NO_ALIGNMENT_TIME_OUT));
+               return new CheckpointBarrier(id, 0, 
CheckpointOptions.unaligned(getDefault()));
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
index 5f912ab..d97bca2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
@@ -221,7 +221,7 @@ public class 
MultipleInputStreamTaskChainedSourcesCheckpointingTest {
 
        private CheckpointBarrier 
createBarrier(StreamTaskMailboxTestHarness<String> testHarness) {
                StreamConfig config = 
testHarness.getStreamTask().getConfiguration();
-               CheckpointOptions checkpointOptions = CheckpointOptions.create(
+               CheckpointOptions checkpointOptions = 
CheckpointOptions.forConfig(
                                CheckpointType.CHECKPOINT,
                                CheckpointStorageLocationReference.getDefault(),
                                config.isExactlyOnceCheckpointMode(),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index 304e448..b83e2de 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -104,8 +104,9 @@ public class SubtaskCheckpointCoordinatorTest {
                MockWriter writer = new MockWriter();
                SubtaskCheckpointCoordinator coordinator = 
coordinator(unalignedCheckpointEnabled, writer);
                CheckpointStorageLocationReference locationReference = 
CheckpointStorageLocationReference.getDefault();
-               CheckpointOptions options = new 
CheckpointOptions(checkpointType, locationReference, true, 
unalignedCheckpointEnabled, 0);
-               coordinator.initCheckpoint(1L, options);
+               coordinator.initCheckpoint(1L, unalignedCheckpointEnabled ?
+                       CheckpointOptions.unaligned(locationReference) :
+                       CheckpointOptions.alignedNoTimeout(checkpointType, 
locationReference));
                return writer.started;
        }
 

Reply via email to