Repository: flink
Updated Branches:
  refs/heads/master f025c455f -> 07ab9f453


[FLINK-4984] [checkpointing] Add Cancellation Barriers as a way to signal 
aborted checkpoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a79dd5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a79dd5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a79dd5f

Branch: refs/heads/master
Commit: 0a79dd5fb165c714bdac8de770b45d9406177401
Parents: f025c45
Author: Stephan Ewen <[email protected]>
Authored: Sun Oct 23 18:41:32 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Tue Nov 8 21:14:39 2016 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/CheckpointMetrics.java   |  13 +-
 .../io/network/api/CancelCheckpointMarker.java  |  77 +++
 .../api/serialization/EventSerializer.java      |  57 +-
 .../runtime/io/network/netty/NettyMessage.java  |   2 +-
 .../partition/PipelinedSubpartition.java        |   3 +-
 .../runtime/jobgraph/tasks/StatefulTask.java    |  11 +
 .../api/serialization/EventSerializerTest.java  |  45 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  37 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   5 +
 .../streaming/runtime/io/BarrierBuffer.java     | 215 +++++--
 .../streaming/runtime/io/BarrierTracker.java    | 147 +++--
 .../streaming/runtime/io/BufferSpiller.java     |   3 +-
 .../runtime/io/RecordWriterOutput.java          |   4 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  32 +-
 .../streaming/runtime/tasks/StreamTask.java     |  12 +-
 .../streaming/runtime/io/BarrierBufferTest.java | 572 +++++++++++++++++--
 .../runtime/io/BarrierTrackerTest.java          | 130 ++++-
 17 files changed, 1144 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
index 4155290..f72b00e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java
@@ -20,16 +20,17 @@ package org.apache.flink.runtime.checkpoint;
 
 import java.io.Serializable;
 
+/**
+ * A collection of simple metrics, around the triggering of a checkpoint.
+ */
 public class CheckpointMetrics implements Serializable {
 
-       /**
-        * The number of bytes that were buffered during the checkpoint 
alignment phase
-        */
+       private static final long serialVersionUID = 1L;
+
+       /** The number of bytes that were buffered during the checkpoint 
alignment phase */
        private long bytesBufferedInAlignment;
 
-       /**
-        * The duration (in nanoseconds) that the stream alignment for the 
checkpoint took
-        */
+       /** The duration (in nanoseconds) that the stream alignment for the 
checkpoint took */
        private long alignmentDurationNanos;
 
        /* The duration (in milliseconds) of the synchronous part of the 
operator checkpoint */

http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
new file mode 100644
index 0000000..52a2517
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * The CancelCheckpointMarker travels through the data streams, similar to the 
{@link CheckpointBarrier},
+ * but signals that a certain checkpoint should be canceled. Any in-progress 
alignment for that
+ * checkpoint needs to be canceled and regular processing should be resumed.
+ */
+public class CancelCheckpointMarker extends RuntimeEvent {
+
+       /** The id of the checkpoint to be canceled */
+       private final long checkpointId;
+
+       public CancelCheckpointMarker(long checkpointId) {
+               this.checkpointId = checkpointId;
+       }
+
+       public long getCheckpointId() {
+               return checkpointId;
+       }
+
+       // 
------------------------------------------------------------------------
+       // These known and common event go through special code paths, rather 
than
+       // through generic serialization 
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               throw new UnsupportedOperationException("this method should 
never be called");
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               throw new UnsupportedOperationException("this method should 
never be called");
+       }
+       
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               return (int) (checkpointId ^ (checkpointId >>> 32));
+       }
+
+       @Override
+       public boolean equals(Object other) {
+               return other != null && 
+                               other.getClass() == 
CancelCheckpointMarker.class &&
+                               this.checkpointId == ((CancelCheckpointMarker) 
other).checkpointId;
+       }
+
+       @Override
+       public String toString() {
+               return "CancelCheckpointMarker " + checkpointId;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index a34f8cf..0bc3b28 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -38,7 +39,7 @@ import java.nio.ByteOrder;
  * Utility class to serialize and deserialize task events.
  */
 public class EventSerializer {
-       
+
        private static final int END_OF_PARTITION_EVENT = 0;
 
        private static final int CHECKPOINT_BARRIER_EVENT = 1;
@@ -46,17 +47,19 @@ public class EventSerializer {
        private static final int END_OF_SUPERSTEP_EVENT = 2;
 
        private static final int OTHER_EVENT = 3;
-       
+
+       private static final int CANCEL_CHECKPOINT_MARKER_EVENT = 4;
+
        // 
------------------------------------------------------------------------
-       
-       public static ByteBuffer toSerializedEvent(AbstractEvent event) {
+
+       public static ByteBuffer toSerializedEvent(AbstractEvent event) throws 
IOException {
                final Class<?> eventClass = event.getClass();
                if (eventClass == EndOfPartitionEvent.class) {
                        return ByteBuffer.wrap(new byte[] { 0, 0, 0, 
END_OF_PARTITION_EVENT });
                }
                else if (eventClass == CheckpointBarrier.class) {
                        CheckpointBarrier barrier = (CheckpointBarrier) event;
-                       
+
                        ByteBuffer buf = ByteBuffer.allocate(20);
                        buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
                        buf.putLong(4, barrier.getId());
@@ -66,32 +69,39 @@ public class EventSerializer {
                else if (eventClass == EndOfSuperstepEvent.class) {
                        return ByteBuffer.wrap(new byte[] { 0, 0, 0, 
END_OF_SUPERSTEP_EVENT });
                }
+               else if (eventClass == CancelCheckpointMarker.class) {
+                       CancelCheckpointMarker marker = 
(CancelCheckpointMarker) event;
+
+                       ByteBuffer buf = ByteBuffer.allocate(12);
+                       buf.putInt(0, CANCEL_CHECKPOINT_MARKER_EVENT);
+                       buf.putLong(4, marker.getCheckpointId());
+                       return buf;
+               }
                else {
                        try {
                                final DataOutputSerializer serializer = new 
DataOutputSerializer(128);
                                serializer.writeInt(OTHER_EVENT);
                                serializer.writeUTF(event.getClass().getName());
                                event.write(serializer);
-       
                                return serializer.wrapAsByteBuffer();
                        }
                        catch (IOException e) {
-                               throw new RuntimeException("Error while 
serializing event.", e);
+                               throw new IOException("Error while serializing 
event.", e);
                        }
                }
        }
 
-       public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, 
ClassLoader classLoader) {
+       public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, 
ClassLoader classLoader) throws IOException {
                if (buffer.remaining() < 4) {
-                       throw new RuntimeException("Incomplete event");
+                       throw new IOException("Incomplete event");
                }
-               
+
                final ByteOrder bufferOrder = buffer.order();
                buffer.order(ByteOrder.BIG_ENDIAN);
-               
+
                try {
                        int type = buffer.getInt();
-                               
+
                        if (type == END_OF_PARTITION_EVENT) {
                                return EndOfPartitionEvent.INSTANCE;
                        }
@@ -103,35 +113,38 @@ public class EventSerializer {
                        else if (type == END_OF_SUPERSTEP_EVENT) {
                                return EndOfSuperstepEvent.INSTANCE;
                        }
+                       else if (type == CANCEL_CHECKPOINT_MARKER_EVENT) {
+                               long id = buffer.getLong();
+                               return new CancelCheckpointMarker(id);
+                       }
                        else if (type == OTHER_EVENT) {
                                try {
                                        final DataInputDeserializer 
deserializer = new DataInputDeserializer(buffer);
-               
                                        final String className = 
deserializer.readUTF();
-               
+
                                        final Class<? extends AbstractEvent> 
clazz;
                                        try {
                                                clazz = 
classLoader.loadClass(className).asSubclass(AbstractEvent.class);
                                        }
                                        catch (ClassNotFoundException e) {
-                                               throw new 
RuntimeException("Could not load event class '" + className + "'.", e);
+                                               throw new IOException("Could 
not load event class '" + className + "'.", e);
                                        }
                                        catch (ClassCastException e) {
-                                               throw new RuntimeException("The 
class '" + className + "' is not a valid subclass of '"
+                                               throw new IOException("The 
class '" + className + "' is not a valid subclass of '"
                                                                + 
AbstractEvent.class.getName() + "'.", e);
                                        }
-               
+
                                        final AbstractEvent event = 
InstantiationUtil.instantiate(clazz, AbstractEvent.class);
                                        event.read(deserializer);
-               
+
                                        return event;
                                }
                                catch (Exception e) {
-                                       throw new RuntimeException("Error while 
deserializing or instantiating event.", e);
+                                       throw new IOException("Error while 
deserializing or instantiating event.", e);
                                }
                        } 
                        else {
-                               throw new RuntimeException("Corrupt byte stream 
for event");
+                               throw new IOException("Corrupt byte stream for 
event");
                        }
                }
                finally {
@@ -143,7 +156,7 @@ public class EventSerializer {
        // Buffer helpers
        // 
------------------------------------------------------------------------
 
-       public static Buffer toBuffer(AbstractEvent event) {
+       public static Buffer toBuffer(AbstractEvent event) throws IOException {
                final ByteBuffer serializedEvent = 
EventSerializer.toSerializedEvent(event);
 
                MemorySegment data = 
MemorySegmentFactory.wrap(serializedEvent.array());
@@ -154,7 +167,7 @@ public class EventSerializer {
                return buffer;
        }
 
-       public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader 
classLoader) {
+       public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader 
classLoader) throws IOException {
                return fromSerializedEvent(buffer.getNioBuffer(), classLoader);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index b97bd82..93775df 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -440,7 +440,7 @@ abstract class NettyMessage {
                }
 
                @Override
-               public void readFrom(ByteBuf buffer) {
+               public void readFrom(ByteBuf buffer) throws IOException {
                        // TODO Directly deserialize fromNetty's buffer
                        int length = buffer.readInt();
                        ByteBuffer serializedEvent = 
ByteBuffer.allocate(length);

http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 266f581..3981a26 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.util.event.NotificationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayDeque;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -88,7 +89,7 @@ class PipelinedSubpartition extends ResultSubpartition {
        }
 
        @Override
-       public void finish() {
+       public void finish() throws IOException {
                final NotificationListener listener;
 
                synchronized (buffers) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index b0c3730..c91dcf2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -61,6 +61,17 @@ public interface StatefulTask {
        void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) 
throws Exception;
 
        /**
+        * Aborts a checkpoint as the result of receiving possibly some 
checkpoint barriers,
+        * but at least one {@link 
org.apache.flink.runtime.io.network.api.CancelCheckpointMarker}.
+        * 
+        * <p>This requires implementing tasks to forward a
+        * {@link 
org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their 
outputs.
+        * 
+        * @param checkpointId The ID of the checkpoint to be aborted.
+        */
+       void abortCheckpointOnBarrier(long checkpointId) throws Exception;
+
+       /**
         * Invoked when a checkpoint has been completed, i.e., when the 
checkpoint coordinator has received
         * the notification from all participating tasks.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index ddfd758..d47a0b5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -28,34 +29,30 @@ import org.junit.Test;
 
 import java.nio.ByteBuffer;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class EventSerializerTest {
 
        @Test
-       public void testSerializeDeserializeEvent() {
-               try {
-                       AbstractEvent[] events = {
-                                       EndOfPartitionEvent.INSTANCE,
-                                       EndOfSuperstepEvent.INSTANCE,
-                                       new CheckpointBarrier(1678L, 4623784L),
-                                       new TestTaskEvent(Math.random(), 
12361231273L)
-                       };
-                       
-                       for (AbstractEvent evt : events) {
-                               ByteBuffer serializedEvent = 
EventSerializer.toSerializedEvent(evt);
-                               assertTrue(serializedEvent.hasRemaining());
-
-                               AbstractEvent deserialized = 
-                                               
EventSerializer.fromSerializedEvent(serializedEvent, 
getClass().getClassLoader());
-                               assertNotNull(deserialized);
-                               assertEquals(evt, deserialized);
-                       }
-                       
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
+       public void testSerializeDeserializeEvent() throws Exception {
+               AbstractEvent[] events = {
+                               EndOfPartitionEvent.INSTANCE,
+                               EndOfSuperstepEvent.INSTANCE,
+                               new CheckpointBarrier(1678L, 4623784L),
+                               new TestTaskEvent(Math.random(), 12361231273L),
+                               new CancelCheckpointMarker(287087987329842L)
+               };
+               
+               for (AbstractEvent evt : events) {
+                       ByteBuffer serializedEvent = 
EventSerializer.toSerializedEvent(evt);
+                       assertTrue(serializedEvent.hasRemaining());
+
+                       AbstractEvent deserialized = 
+                                       
EventSerializer.fromSerializedEvent(serializedEvent, 
getClass().getClassLoader());
+                       assertNotNull(deserialized);
+                       assertEquals(evt, deserialized);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index b195858..e40b2df 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -454,24 +454,20 @@ public class JobManagerHARecoveryTest {
                }
 
                @Override
-               public boolean triggerCheckpoint(CheckpointMetaData 
checkpointMetaData) {
-                       try {
-                               ByteStreamStateHandle byteStreamStateHandle = 
new TestByteStreamStateHandleDeepCompare(
-                                               
String.valueOf(UUID.randomUUID()),
-                                               
InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId()));
-
-                               ChainedStateHandle<StreamStateHandle> 
chainedStateHandle =
-                                               new 
ChainedStateHandle<StreamStateHandle>(Collections.singletonList(byteStreamStateHandle));
-                               SubtaskState checkpointStateHandles =
-                                               new 
SubtaskState(chainedStateHandle, null, null, null, null, 0L);
-
-                               getEnvironment().acknowledgeCheckpoint(
-                                               new 
CheckpointMetaData(checkpointMetaData.getCheckpointId(), -1, 0L, 0L, 0L, 0L),
-                                               checkpointStateHandles);
-                               return true;
-                       } catch (Exception ex) {
-                               throw new RuntimeException(ex);
-                       }
+               public boolean triggerCheckpoint(CheckpointMetaData 
checkpointMetaData) throws Exception {
+                       ByteStreamStateHandle byteStreamStateHandle = new 
TestByteStreamStateHandleDeepCompare(
+                                       String.valueOf(UUID.randomUUID()),
+                                       
InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId()));
+
+                       ChainedStateHandle<StreamStateHandle> 
chainedStateHandle =
+                                       new 
ChainedStateHandle<StreamStateHandle>(Collections.singletonList(byteStreamStateHandle));
+                       SubtaskState checkpointStateHandles =
+                                       new SubtaskState(chainedStateHandle, 
null, null, null, null, 0L);
+
+                       getEnvironment().acknowledgeCheckpoint(
+                                       new 
CheckpointMetaData(checkpointMetaData.getCheckpointId(), -1, 0L, 0L, 0L, 0L),
+                                       checkpointStateHandles);
+                       return true;
                }
 
                @Override
@@ -480,6 +476,11 @@ public class JobManagerHARecoveryTest {
                }
 
                @Override
+               public void abortCheckpointOnBarrier(long checkpointId) {
+                       throw new UnsupportedOperationException("should not be 
called!");
+               }
+
+               @Override
                public void notifyCheckpointComplete(long checkpointId) {
                        if (completedCheckpoints++ > 
NUM_CHECKPOINTS_TO_COMPLETE) {
                                completedCheckpointsLatch.countDown();

http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index ed107c7..ab29bb0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -232,6 +232,11 @@ public class TaskAsyncCallTest {
                }
 
                @Override
+               public void abortCheckpointOnBarrier(long checkpointId) {
+                       throw new UnsupportedOperationException("Should not be 
called");
+               }
+
+               @Override
                public void notifyCheckpointComplete(long checkpointId) {
                        if (checkpointId != lastCheckpointId && this.error == 
null) {
                                this.error = new Exception("calls out of 
order");

http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 7f01129..3ccb575 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -20,11 +20,13 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,7 +72,8 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        /** The ID of the checkpoint for which we expect barriers */
        private long currentCheckpointId = -1L;
 
-       /** The number of received barriers (= number of blocked/buffered 
channels) */
+       /** The number of received barriers (= number of blocked/buffered 
channels)
+        * IMPORTANT: A canceled checkpoint must always have 0 barriers */
        private int numBarriersReceived;
 
        /** The number of already closed channels */
@@ -96,7 +99,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                this.inputGate = inputGate;
                this.totalNumberOfInputChannels = 
inputGate.getNumberOfInputChannels();
                this.blockedChannels = new 
boolean[this.totalNumberOfInputChannels];
-               
+
                this.bufferSpiller = new BufferSpiller(ioManager, 
inputGate.getPageSize());
                this.queuedBuffered = new 
ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>();
        }
@@ -135,11 +138,12 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                                
processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
                                        }
                                }
+                               else if (next.getEvent().getClass() == 
CancelCheckpointMarker.class) {
+                                       
processCancellationBarrier((CancelCheckpointMarker) next.getEvent());
+                               }
                                else {
                                        if (next.getEvent().getClass() == 
EndOfPartitionEvent.class) {
-                                               numClosedChannels++;
-                                               // no chance to complete this 
checkpoint
-                                               releaseBlocks();
+                                               
processEndOfPartition(next.getChannelIndex());
                                        }
                                        return next;
                                }
@@ -147,7 +151,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        else if (!endOfStream) {
                                // end of input stream. stream continues with 
the buffered data
                                endOfStream = true;
-                               releaseBlocks();
+                               releaseBlocksAndResetBarriers();
                                return getNextNonBlocked();
                        }
                        else {
@@ -156,7 +160,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        }
                }
        }
-       
+
        private void completeBufferedSequence() throws IOException {
                currentBuffered.cleanup();
                currentBuffered = queuedBuffered.pollFirst();
@@ -164,72 +168,171 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        currentBuffered.open();
                }
        }
-       
+
        private void processBarrier(CheckpointBarrier receivedBarrier, int 
channelIndex) throws Exception {
                final long barrierId = receivedBarrier.getId();
 
+               // fast path for single channel cases
+               if (totalNumberOfInputChannels == 1) {
+                       if (barrierId > currentCheckpointId) {
+                               // new checkpoint
+                               currentCheckpointId = barrierId;
+                               notifyCheckpoint(receivedBarrier);
+                       }
+                       return;
+               }
+
+               // -- general code path for multiple input channels --
+
                if (numBarriersReceived > 0) {
-                       // subsequent barrier of a checkpoint.
+                       // this is only true if some alignment is already 
progress and was not canceled
+
                        if (barrierId == currentCheckpointId) {
                                // regular case
                                onBarrier(channelIndex);
                        }
                        else if (barrierId > currentCheckpointId) {
-                               // we did not complete the current checkpoint
+                               // we did not complete the current checkpoint, 
another started before
                                LOG.warn("Received checkpoint barrier for 
checkpoint {} before completing current checkpoint {}. " +
                                                "Skipping current checkpoint.", 
barrierId, currentCheckpointId);
 
-                               releaseBlocks();
-                               currentCheckpointId = barrierId;
-                               onBarrier(channelIndex);
+                               // let the task know we are not completing this
+                               notifyAbort(currentCheckpointId);
 
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Starting stream alignment 
for checkpoint {}", barrierId);
-                               }
-                               startOfAlignmentTimestamp = System.nanoTime();
+                               // abort the current checkpoint
+                               releaseBlocksAndResetBarriers();
+
+                               // begin a the new checkpoint
+                               beginNewAlignment(barrierId, channelIndex);
                        }
                        else {
-                               // ignore trailing barrier from aborted 
checkpoint
+                               // ignore trailing barrier from an earlier 
checkpoint (obsolete now)
                                return;
                        }
-                       
                }
                else if (barrierId > currentCheckpointId) {
                        // first barrier of a new checkpoint
-                       currentCheckpointId = barrierId;
-                       onBarrier(channelIndex);
-
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Starting stream alignment for 
checkpoint {}", barrierId);
-                       }
-                       startOfAlignmentTimestamp = System.nanoTime();
+                       beginNewAlignment(barrierId, channelIndex);
                }
                else {
-                       // trailing barrier from previous (skipped) checkpoint
+                       // either the current checkpoint was canceled 
(numBarriers == 0) or
+                       // this barrier is from an old subsumed checkpoint
                        return;
                }
 
-               // check if we have all barriers
+               // check if we have all barriers - since canceled checkpoints 
always have zero barriers
+               // this can only happen on a non canceled checkpoint
                if (numBarriersReceived + numClosedChannels == 
totalNumberOfInputChannels) {
+                       // actually trigger checkpoint
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("Received all barrier, triggering 
checkpoint {} at {}",
+                               LOG.debug("Received all barriers, triggering 
checkpoint {} at {}",
                                                receivedBarrier.getId(), 
receivedBarrier.getTimestamp());
                        }
 
-                       releaseBlocks();
+                       releaseBlocksAndResetBarriers();
+                       notifyCheckpoint(receivedBarrier);
+               }
+       }
+
+       private void processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws Exception {
+               final long barrierId = cancelBarrier.getCheckpointId();
+
+               // fast path for single channel cases
+               if (totalNumberOfInputChannels == 1) {
+                       if (barrierId > currentCheckpointId) {
+                               // new checkpoint
+                               currentCheckpointId = barrierId;
+                               notifyAbort(barrierId);
+                       }
+                       return;
+               }
+
+               // -- general code path for multiple input channels --
+
+               if (numBarriersReceived > 0) {
+                       // this is only true if some alignment is in progress 
and nothing was canceled
+
+                       if (barrierId == currentCheckpointId) {
+                               // cancel this alignment
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Checkpoint {} canceled, 
aborting alignment", barrierId);
+                               }
+
+                               releaseBlocksAndResetBarriers();
+                               notifyAbort(barrierId);
+                       }
+                       else if (barrierId > currentCheckpointId) {
+                               // we canceled the next which also cancels the 
current
+                               LOG.warn("Received cancellation barrier for 
checkpoint {} before completing current checkpoint {}. " +
+                                               "Skipping current checkpoint.", 
barrierId, currentCheckpointId);
+
+                               // this stops the current alignment
+                               releaseBlocksAndResetBarriers();
+
+                               // the next checkpoint starts as canceled
+                               currentCheckpointId = barrierId;
+                               startOfAlignmentTimestamp = 0L;
+                               latestAlignmentDurationNanos = 0L;
+                               notifyAbort(barrierId);
+                       }
+
+                       // else: ignore trailing (cancellation) barrier from an 
earlier checkpoint (obsolete now)
+
+               }
+               else if (barrierId > currentCheckpointId) {
+                       // first barrier of a new checkpoint is directly a 
cancellation
+
+                       // by setting the currentCheckpointId to this 
checkpoint while keeping the numBarriers
+                       // at zero means that no checkpoint barrier can start a 
new alignment
+                       currentCheckpointId = barrierId;
 
-                       if (toNotifyOnCheckpoint != null) {
-                               CheckpointMetaData checkpointMetaData =
-                                               new 
CheckpointMetaData(receivedBarrier.getId(), receivedBarrier.getTimestamp());
-                               checkpointMetaData.
-                                               
setBytesBufferedInAlignment(bufferSpiller.getBytesWritten()).
-                                               
setAlignmentDurationNanos(latestAlignmentDurationNanos);
+                       startOfAlignmentTimestamp = 0L;
+                       latestAlignmentDurationNanos = 0L;
 
-                               
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Checkpoint {} canceled, skipping 
alignment", barrierId);
                        }
+
+                       notifyAbort(barrierId);
                }
+
+               // else: trailing barrier from either
+               //   - a previous (subsumed) checkpoint
+               //   - the current checkpoint if it was already canceled
        }
-       
+
+       private void processEndOfPartition(int channel) throws Exception {
+               numClosedChannels++;
+
+               if (numBarriersReceived > 0) {
+                       // let the task know we skip a checkpoint
+                       notifyAbort(currentCheckpointId);
+
+                       // no chance to complete this checkpoint
+                       releaseBlocksAndResetBarriers();
+               }
+       }
+
+       private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) 
throws Exception {
+               if (toNotifyOnCheckpoint != null) {
+                       CheckpointMetaData checkpointMetaData =
+                                       new 
CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
+
+                       checkpointMetaData
+                                       
.setBytesBufferedInAlignment(bufferSpiller.getBytesWritten())
+                                       
.setAlignmentDurationNanos(latestAlignmentDurationNanos);
+
+                       
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
+               }
+       }
+
+       private void notifyAbort(long checkpointId) throws Exception {
+               if (toNotifyOnCheckpoint != null) {
+                       
toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+               }
+       }
+
+
        @Override
        public void registerCheckpointEventHandler(StatefulTask 
toNotifyOnCheckpoint) {
                if (this.toNotifyOnCheckpoint == null) {
@@ -239,7 +342,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        throw new IllegalStateException("BarrierBuffer already 
has a registered checkpoint notifyee");
                }
        }
-       
+
        @Override
        public boolean isEmpty() {
                return currentBuffered == null;
@@ -254,8 +357,20 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                for (BufferSpiller.SpilledBufferOrEventSequence seq : 
queuedBuffered) {
                        seq.cleanup();
                }
+               queuedBuffered.clear();
        }
-       
+
+       private void beginNewAlignment(long checkpointId, int channelIndex) 
throws IOException {
+               currentCheckpointId = checkpointId;
+               onBarrier(channelIndex);
+
+               startOfAlignmentTimestamp = System.nanoTime();
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Starting stream alignment for checkpoint " + 
checkpointId);
+               }
+       }
+
        /**
         * Checks whether the channel with the given index is blocked.
         * 
@@ -265,7 +380,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        private boolean isBlocked(int channelIndex) {
                return blockedChannels[channelIndex];
        }
-       
+
        /**
         * Blocks the given channel index, from which a barrier has been 
received.
         * 
@@ -274,28 +389,28 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        private void onBarrier(int channelIndex) throws IOException {
                if (!blockedChannels[channelIndex]) {
                        blockedChannels[channelIndex] = true;
+
                        numBarriersReceived++;
-                       
+
                        if (LOG.isDebugEnabled()) {
                                LOG.debug("Received barrier from channel " + 
channelIndex);
                        }
                }
                else {
-                       throw new IOException("Stream corrupt: Repeated barrier 
for same checkpoint and input stream");
+                       throw new IOException("Stream corrupt: Repeated barrier 
for same checkpoint on input " + channelIndex);
                }
        }
 
        /**
-        * Releases the blocks on all channels. Makes sure the just written data
-        * is the next to be consumed.
+        * Releases the blocks on all channels and resets the barrier count.
+        * Makes sure the just written data is the next to be consumed.
         */
-       private void releaseBlocks() throws IOException {
+       private void releaseBlocksAndResetBarriers() throws IOException {
                LOG.debug("End of stream alignment, feeding buffered data 
back");
 
                for (int i = 0; i < blockedChannels.length; i++) {
                        blockedChannels[i] = false;
                }
-               numBarriersReceived = 0;
 
                if (currentBuffered == null) {
                        // common case: no more buffered data
@@ -317,9 +432,11 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        }
                }
 
-               final long now = System.nanoTime();
+               // the next barrier that comes must assume it is the first
+               numBarriersReceived = 0;
+
                if (startOfAlignmentTimestamp > 0) {
-                       latestAlignmentDurationNanos = now - 
startOfAlignmentTimestamp;
+                       latestAlignmentDurationNanos = System.nanoTime() - 
startOfAlignmentTimestamp;
                        startOfAlignmentTimestamp = 0;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 86945a8..cce7cb4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -34,9 +35,9 @@ import java.util.ArrayDeque;
  * 
  * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the 
input
  * channels that have sent barriers, so it cannot be used to gain 
"exactly-once" processing
- * guarantees. It can, however, be used to gain "at least once" processing 
guarantees.</p>
+ * guarantees. It can, however, be used to gain "at least once" processing 
guarantees.
  * 
- * <p>NOTE: This implementation strictly assumes that newer checkpoints have 
higher checkpoint IDs.</p>
+ * <p>NOTE: This implementation strictly assumes that newer checkpoints have 
higher checkpoint IDs.
  */
 @Internal
 public class BarrierTracker implements CheckpointBarrierHandler {
@@ -74,15 +75,20 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
        public BufferOrEvent getNextNonBlocked() throws Exception {
                while (true) {
                        BufferOrEvent next = inputGate.getNextBufferOrEvent();
-                       if (next == null) {
-                               return null;
-                       }
-                       else if (next.isBuffer() || next.getEvent().getClass() 
!= CheckpointBarrier.class) {
+                       if (next == null || next.isBuffer()) {
+                               // buffer or input exhausted
                                return next;
                        }
-                       else {
+                       else if (next.getEvent().getClass() == 
CheckpointBarrier.class) {
                                processBarrier((CheckpointBarrier) 
next.getEvent());
                        }
+                       else if (next.getEvent().getClass() == 
CancelCheckpointMarker.class) {
+                               
processCheckpointAbortBarrier((CancelCheckpointMarker) next.getEvent());
+                       }
+                       else {
+                               // some other event
+                               return next;
+                       }
                }
        }
 
@@ -113,23 +119,15 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
        }
 
        private void processBarrier(CheckpointBarrier receivedBarrier) throws 
Exception {
+               final long barrierId = receivedBarrier.getId();
+
                // fast path for single channel trackers
                if (totalNumberOfInputChannels == 1) {
-                       if (toNotifyOnCheckpoint != null) {
-                               CheckpointMetaData checkpointMetaData =
-                                               new 
CheckpointMetaData(receivedBarrier.getId(), receivedBarrier.getTimestamp());
-
-                               checkpointMetaData.
-                                               setBytesBufferedInAlignment(0L).
-                                               setAlignmentDurationNanos(0L);
-
-                               
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
-                       }
+                       notifyCheckpoint(barrierId, 
receivedBarrier.getTimestamp());
                        return;
                }
 
                // general path for multiple input channels
-               final long barrierId = receivedBarrier.getId();
 
                // find the checkpoint barrier in the queue of bending barriers
                CheckpointBarrierCount cbc = null;
@@ -147,22 +145,16 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
                        // add one to the count to that barrier and check for 
completion
                        int numBarriersNew = cbc.incrementBarrierCount();
                        if (numBarriersNew == totalNumberOfInputChannels) {
-                               // checkpoint can be triggered
+                               // checkpoint can be triggered (or is aborted 
and all barriers have been seen)
                                // first, remove this checkpoint and all all 
prior pending
                                // checkpoints (which are now subsumed)
                                for (int i = 0; i <= pos; i++) {
                                        pendingCheckpoints.pollFirst();
                                }
-                               
+
                                // notify the listener
-                               if (toNotifyOnCheckpoint != null) {
-                                       CheckpointMetaData checkpointMetaData =
-                                                       new 
CheckpointMetaData(receivedBarrier.getId(), receivedBarrier.getTimestamp());
-                                       checkpointMetaData.
-                                                       
setBytesBufferedInAlignment(0L).
-                                                       
setAlignmentDurationNanos(0L);
-
-                                       
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
+                               if (!cbc.isAborted()) {
+                                       
notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp());
                                }
                        }
                }
@@ -183,45 +175,110 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
                }
        }
 
+       private void processCheckpointAbortBarrier(CancelCheckpointMarker 
barrier) throws Exception {
+               final long checkpointId = barrier.getCheckpointId();
+
+               // fast path for single channel trackers
+               if (totalNumberOfInputChannels == 1) {
+                       notifyAbort(checkpointId);
+                       return;
+               }
+
+               // -- general path for multiple input channels --
+
+               // find the checkpoint barrier in the queue of pending barriers
+               // while doing this we "abort" all checkpoints before that one
+               CheckpointBarrierCount cbc;
+               while ((cbc = pendingCheckpoints.peekFirst()) != null && 
cbc.checkpointId() < checkpointId) {
+                       pendingCheckpoints.removeFirst();
+               }
+
+               if (cbc != null && cbc.checkpointId() == checkpointId) {
+                       // make sure the checkpoint is remembered as aborted
+                       if (cbc.markAborted()) {
+                               // this was the first time the checkpoint was 
aborted - notify
+                               notifyAbort(checkpointId);
+                       }
+
+                       // we still count the barriers to be able to remove the 
entry once all barriers have been seen
+                       if (cbc.incrementBarrierCount() == 
totalNumberOfInputChannels) {
+                               // we can remove this entry
+                               pendingCheckpoints.removeFirst();
+                       }
+               }
+               else {
+                       notifyAbort(checkpointId);
+
+                       // first barrier for this checkpoint - remember it as 
aborted
+                       // since we polled away all entries with lower 
checkpoint IDs
+                       // this entry will become the new first entry
+                       if (pendingCheckpoints.size() < 
MAX_CHECKPOINTS_TO_TRACK) {
+                               CheckpointBarrierCount abortedMarker = new 
CheckpointBarrierCount(checkpointId);
+                               abortedMarker.markAborted();
+                               pendingCheckpoints.addFirst(abortedMarker);
+                       }
+               }
+       }
+
+       private void notifyCheckpoint(long checkpointId, long timestamp) throws 
Exception {
+               if (toNotifyOnCheckpoint != null) {
+                       CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
+
+                       checkpointMetaData
+                                       .setBytesBufferedInAlignment(0L)
+                                       .setAlignmentDurationNanos(0L);
+
+                       
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData);
+               }
+       }
+
+       private void notifyAbort(long checkpointId) throws Exception {
+               if (toNotifyOnCheckpoint != null) {
+                       
toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+               }
+       }
+
        // 
------------------------------------------------------------------------
 
        /**
         * Simple class for a checkpoint ID with a barrier counter.
         */
        private static final class CheckpointBarrierCount {
-               
+
                private final long checkpointId;
-               
+
                private int barrierCount;
-               
-               private CheckpointBarrierCount(long checkpointId) {
+
+               private boolean aborted;
+
+               CheckpointBarrierCount(long checkpointId) {
                        this.checkpointId = checkpointId;
                        this.barrierCount = 1;
                }
 
+               public long checkpointId() {
+                       return checkpointId;
+               }
+
                public int incrementBarrierCount() {
                        return ++barrierCount;
                }
 
-               @Override
-               public int hashCode() {
-                       return (int) ((checkpointId >>> 32) ^ checkpointId) + 
17 * barrierCount; 
+               public boolean isAborted() {
+                       return aborted;
                }
 
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj instanceof  CheckpointBarrierCount) {
-                               CheckpointBarrierCount that = 
(CheckpointBarrierCount) obj;
-                               return this.checkpointId == that.checkpointId 
&& this.barrierCount == that.barrierCount;
-                       }
-                       else {
-                               return false;
-                       }
+               public boolean markAborted() {
+                       boolean firstAbort = !this.aborted;
+                       this.aborted = true;
+                       return firstAbort;
                }
 
                @Override
                public String toString() {
-                       return String.format("checkpointID=%d, count=%d", 
checkpointId, barrierCount);
+                       return isAborted() ?
+                               String.format("checkpointID=%d - ABORTED", 
checkpointId) :
+                               String.format("checkpointID=%d, count=%d", 
checkpointId, barrierCount);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 5a8a4cd..45a330b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -132,9 +132,8 @@ public class BufferSpiller {
                        }
                        else {
                                contents = 
EventSerializer.toSerializedEvent(boe.getEvent());
-                               
                        }
-                       
+
                        headBuffer.clear();
                        headBuffer.putInt(boe.getChannelIndex());
                        headBuffer.putInt(contents.remaining());

http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index c3ef464..2625031 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -100,8 +100,8 @@ public class RecordWriterOutput<OUT> implements 
Output<StreamRecord<OUT>> {
                }
        }
 
-       public void broadcastEvent(AbstractEvent barrier) throws IOException, 
InterruptedException {
-               recordWriter.broadcastEvent(barrier);
+       public void broadcastEvent(AbstractEvent event) throws IOException, 
InterruptedException {
+               recordWriter.broadcastEvent(event);
        }
        
        

http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 30bc377..5ea84fb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
@@ -134,15 +135,32 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
                }
                
        }
-       
-       
-       public void broadcastCheckpointBarrier(long id, long timestamp) throws 
IOException, InterruptedException {
-               CheckpointBarrier barrier = new CheckpointBarrier(id, 
timestamp);
-               for (RecordWriterOutput<?> streamOutput : streamOutputs) {
-                       streamOutput.broadcastEvent(barrier);
+
+
+       public void broadcastCheckpointBarrier(long id, long timestamp) throws 
IOException {
+               try {
+                       CheckpointBarrier barrier = new CheckpointBarrier(id, 
timestamp);
+                       for (RecordWriterOutput<?> streamOutput : 
streamOutputs) {
+                               streamOutput.broadcastEvent(barrier);
+                       }
+               }
+               catch (InterruptedException e) {
+                       throw new IOException("Interrupted while broadcasting 
checkpoint barrier");
                }
        }
-       
+
+       public void broadcastCheckpointCancelMarker(long id) throws IOException 
{
+               try {
+                       CancelCheckpointMarker barrier = new 
CancelCheckpointMarker(id);
+                       for (RecordWriterOutput<?> streamOutput : 
streamOutputs) {
+                               streamOutput.broadcastEvent(barrier);
+                       }
+               }
+               catch (InterruptedException e) {
+                       throw new IOException("Interrupted while broadcasting 
checkpoint cancellation");
+               }
+       }
+
        public RecordWriterOutput<?>[] getStreamOutputs() {
                return streamOutputs;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index c75458e..1e03a96 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -528,7 +528,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
        @Override
        public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData) throws Exception {
-
                try {
                        performCheckpoint(checkpointMetaData);
                }
@@ -540,6 +539,17 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                }
        }
 
+       @Override
+       public void abortCheckpointOnBarrier(long checkpointId) throws 
Exception {
+               LOG.debug("Aborting checkpoint via cancel-barrier {} for task 
{}", checkpointId, getName());
+
+               synchronized (lock) {
+                       if (isRunning) {
+                               
operatorChain.broadcastCheckpointCancelMarker(checkpointId);
+                       }
+               }
+       }
+
        private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws Exception {
 
                LOG.debug("Starting checkpoint {} on task {}", 
checkpointMetaData.getCheckpointId(), getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index f2fc876..8754e10 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -18,29 +18,30 @@
 
 package org.apache.flink.streaming.runtime.io;
 
+import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.File;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
+import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -48,15 +49,24 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
 /**
  * Tests for the behavior of the {@link BarrierBuffer}.
  */
 public class BarrierBufferTest {
 
+       private static final Random RND = new Random();
+
        private static final int PAGE_SIZE = 512;
-       
+
        private static int SIZE_COUNTER = 0;
-       
+
        private static IOManager IO_MANAGER;
 
        @BeforeClass
@@ -528,36 +538,53 @@ public class BarrierBufferTest {
                        MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
                        BarrierBuffer buffer = new BarrierBuffer(gate, 
IO_MANAGER);
 
-                       ValidatingCheckpointHandler handler = new 
ValidatingCheckpointHandler();
-                       buffer.registerCheckpointEventHandler(handler);
-                       handler.setNextExpectedCheckpointId(1L);
+                       StatefulTask toNotify = mock(StatefulTask.class);
+                       buffer.registerCheckpointEventHandler(toNotify);
 
-                       // checkpoint 1
+                       long startTs;
+
+                       // initial data
                        check(sequence[0], buffer.getNextNonBlocked());
                        check(sequence[1], buffer.getNextNonBlocked());
                        check(sequence[2], buffer.getNextNonBlocked());
+
+                       // align checkpoint 1
+                       startTs = System.nanoTime();
                        check(sequence[7], buffer.getNextNonBlocked());
                        assertEquals(1L, buffer.getCurrentCheckpointId());
-                       
+
+                       // checkpoint done - replay buffered
                        check(sequence[5], buffer.getNextNonBlocked());
+                       validateAlignmentTime(startTs, buffer);
+                       verify(toNotify).triggerCheckpointOnBarrier(argThat(new 
CheckpointMatcher(1L)));
                        check(sequence[6], buffer.getNextNonBlocked());
+
                        check(sequence[9], buffer.getNextNonBlocked());
                        check(sequence[10], buffer.getNextNonBlocked());
 
                        // alignment of checkpoint 2
+                       startTs = System.nanoTime();
                        check(sequence[13], buffer.getNextNonBlocked());
-                       assertEquals(2L, buffer.getCurrentCheckpointId());
                        check(sequence[15], buffer.getNextNonBlocked());
 
                        // checkpoint 2 aborted, checkpoint 3 started
                        check(sequence[12], buffer.getNextNonBlocked());
                        assertEquals(3L, buffer.getCurrentCheckpointId());
+                       validateAlignmentTime(startTs, buffer);
+                       verify(toNotify).abortCheckpointOnBarrier(2L);
                        check(sequence[16], buffer.getNextNonBlocked());
+
+                       // checkpoint 3 alignment in progress
                        check(sequence[19], buffer.getNextNonBlocked());
-                       check(sequence[20], buffer.getNextNonBlocked());
-                       
+
                        // checkpoint 3 aborted (end of partition)
+                       check(sequence[20], buffer.getNextNonBlocked());
+                       verify(toNotify).abortCheckpointOnBarrier(3L);
+
+                       // replay buffered data from checkpoint 3
                        check(sequence[18], buffer.getNextNonBlocked());
+
+                       // all the remaining messages
                        check(sequence[21], buffer.getNextNonBlocked());
                        check(sequence[22], buffer.getNextNonBlocked());
                        check(sequence[23], buffer.getNextNonBlocked());
@@ -887,9 +914,9 @@ public class BarrierBufferTest {
 
                        assertNull(buffer.getNextNonBlocked());
                        assertNull(buffer.getNextNonBlocked());
-                       
+
                        buffer.cleanup();
-                       
+
                        checkNoTempFilesRemain();
                }
                catch (Exception e) {
@@ -899,26 +926,480 @@ public class BarrierBufferTest {
        }
 
        @Test
-       public void testEndOfStreamWhileCheckpoint() {
+       public void testEndOfStreamWhileCheckpoint() throws Exception {
+               BufferOrEvent[] sequence = {
+                               // one checkpoint
+                               createBarrier(1, 0), createBarrier(1, 1), 
createBarrier(1, 2),
+
+                               // some buffers
+                               createBuffer(0), createBuffer(0), 
createBuffer(2),
+
+                               // start the checkpoint that will be incomplete
+                               createBarrier(2, 2), createBarrier(2, 0),
+                               createBuffer(0), createBuffer(2), 
createBuffer(1),
+
+                               // close one after the barrier one before the 
barrier
+                               createEndOfPartition(2), 
createEndOfPartition(1),
+                               createBuffer(0),
+
+                               // final end of stream
+                               createEndOfPartition(0)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+               // data after first checkpoint
+               check(sequence[3], buffer.getNextNonBlocked());
+               check(sequence[4], buffer.getNextNonBlocked());
+               check(sequence[5], buffer.getNextNonBlocked());
+               assertEquals(1L, buffer.getCurrentCheckpointId());
+
+               // alignment of second checkpoint
+               check(sequence[10], buffer.getNextNonBlocked());
+               assertEquals(2L, buffer.getCurrentCheckpointId());
+
+               // first end-of-partition encountered: checkpoint will not be 
completed
+               check(sequence[12], buffer.getNextNonBlocked());
+               check(sequence[8], buffer.getNextNonBlocked());
+               check(sequence[9], buffer.getNextNonBlocked());
+               check(sequence[11], buffer.getNextNonBlocked());
+               check(sequence[13], buffer.getNextNonBlocked());
+               check(sequence[14], buffer.getNextNonBlocked());
+
+               // all done
+               assertNull(buffer.getNextNonBlocked());
+               assertNull(buffer.getNextNonBlocked());
+
+               buffer.cleanup();
+
+               checkNoTempFilesRemain();
+       }
+
+       @Test
+       public void testSingleChannelAbortCheckpoint() throws Exception {
+               BufferOrEvent[] sequence = {
+                               createBuffer(0),
+                               createBarrier(1, 0),
+                               createBuffer(0),
+                               createBarrier(2, 0),
+                               createCancellationBarrier(4, 0),
+                               createBarrier(5, 0),
+                               createBuffer(0),
+                               createCancellationBarrier(6, 0),
+                               createBuffer(0)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, 
Arrays.asList(sequence));
+               BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+               StatefulTask toNotify = mock(StatefulTask.class);
+               buffer.registerCheckpointEventHandler(toNotify);
+
+               check(sequence[0], buffer.getNextNonBlocked());
+               check(sequence[2], buffer.getNextNonBlocked());
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
+               assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+               check(sequence[6], buffer.getNextNonBlocked());
+               assertEquals(5L, buffer.getCurrentCheckpointId());
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
+               assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+               check(sequence[8], buffer.getNextNonBlocked());
+               assertEquals(6L, buffer.getCurrentCheckpointId());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+               assertEquals(0L, buffer.getAlignmentDurationNanos());
                
+               buffer.cleanup();
+               checkNoTempFilesRemain();
+       }
+
+       @Test
+       public void testMultiChannelAbortCheckpoint() throws Exception {
+               BufferOrEvent[] sequence = {
+                               // some buffers and a successful checkpoint
+                               /* 0 */ createBuffer(0), createBuffer(2), 
createBuffer(0),
+                               /* 3 */ createBarrier(1, 1), createBarrier(1, 
2),
+                               /* 5 */ createBuffer(2), createBuffer(1),
+                               /* 7 */ createBarrier(1, 0),
+                               /* 8 */ createBuffer(0), createBuffer(2),
+
+                               // aborted on last barrier
+                               /* 10 */ createBarrier(2, 0), createBarrier(2, 
2),
+                               /* 12 */ createBuffer(0), createBuffer(2),
+                               /* 14 */ createCancellationBarrier(2, 1),
+
+                               // successful checkpoint
+                               /* 15 */ createBuffer(2), createBuffer(1),
+                               /* 17 */ createBarrier(3, 1), createBarrier(3, 
2), createBarrier(3, 0),
+
+                               // abort on first barrier
+                               /* 20 */ createBuffer(0), createBuffer(1),
+                               /* 22 */ createCancellationBarrier(4, 1), 
createBarrier(4, 2),
+                               /* 24 */ createBuffer(0),
+                               /* 25 */ createBarrier(4, 0),
+
+                               // another successful checkpoint
+                               /* 26 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
+                               /* 29 */ createBarrier(5, 2), createBarrier(5, 
1), createBarrier(5, 0),
+                               /* 32 */ createBuffer(0), createBuffer(1),
+
+                               // abort multiple cancellations and a barrier 
after the cancellations
+                               /* 34 */ createCancellationBarrier(6, 1), 
createCancellationBarrier(6, 2),
+                               /* 36 */ createBarrier(6, 0),
+
+                               /* 37 */ createBuffer(0)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+               StatefulTask toNotify = mock(StatefulTask.class);
+               buffer.registerCheckpointEventHandler(toNotify);
+
+               long startTs;
+
+               // successful first checkpoint, with some aligned buffers
+               check(sequence[0], buffer.getNextNonBlocked());
+               check(sequence[1], buffer.getNextNonBlocked());
+               check(sequence[2], buffer.getNextNonBlocked());
+               startTs = System.nanoTime();
+               check(sequence[5], buffer.getNextNonBlocked());
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
+               validateAlignmentTime(startTs, buffer);
+
+               check(sequence[6], buffer.getNextNonBlocked());
+               check(sequence[8], buffer.getNextNonBlocked());
+               check(sequence[9], buffer.getNextNonBlocked());
+
+               // canceled checkpoint on last barrier
+               startTs = System.nanoTime();
+               check(sequence[12], buffer.getNextNonBlocked());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+               validateAlignmentTime(startTs, buffer);
+               check(sequence[13], buffer.getNextNonBlocked());
+
+               // one more successful checkpoint
+               check(sequence[15], buffer.getNextNonBlocked());
+               check(sequence[16], buffer.getNextNonBlocked());
+               startTs = System.nanoTime();
+               check(sequence[20], buffer.getNextNonBlocked());
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
+               validateAlignmentTime(startTs, buffer);
+               check(sequence[21], buffer.getNextNonBlocked());
+
+               // this checkpoint gets immediately canceled
+               check(sequence[24], buffer.getNextNonBlocked());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+               assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+               // some buffers
+               check(sequence[26], buffer.getNextNonBlocked());
+               check(sequence[27], buffer.getNextNonBlocked());
+               check(sequence[28], buffer.getNextNonBlocked());
+
+               // a simple successful checkpoint
+               startTs = System.nanoTime();
+               check(sequence[32], buffer.getNextNonBlocked());
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
+               validateAlignmentTime(startTs, buffer);
+               check(sequence[33], buffer.getNextNonBlocked());
+
+               check(sequence[37], buffer.getNextNonBlocked());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+               assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+               // all done
+               assertNull(buffer.getNextNonBlocked());
+               assertNull(buffer.getNextNonBlocked());
+
+               buffer.cleanup();
+               checkNoTempFilesRemain();
+       }
+
+       @Test
+       public void testAbortViaQueuedBarriers() throws Exception {
+               BufferOrEvent[] sequence = {
+                               // starting a checkpoint
+                               /* 0 */ createBuffer(1),
+                               /* 1 */ createBarrier(1, 1), createBarrier(1, 
2),
+                               /* 3 */ createBuffer(2), createBuffer(0), 
createBuffer(1),
+
+                               // queued barrier and cancellation barrier
+                               /* 6 */ createCancellationBarrier(2, 2),
+                               /* 7 */ createBarrier(2, 1),
+
+                               // some intermediate buffers (some queued)
+                               /* 8 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
+
+                               // complete initial checkpoint
+                               /* 11 */ createBarrier(1, 0),
+
+                               // some buffers (none queued, since checkpoint 
is aborted)
+                               /* 12 */ createBuffer(2), createBuffer(1), 
createBuffer(0),
+
+                               // final barrier of aborted checkpoint
+                               /* 15 */ createBarrier(2, 0),
+
+                               // some more buffers
+                               /* 16 */ createBuffer(0), createBuffer(1), 
createBuffer(2)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+               StatefulTask toNotify = mock(StatefulTask.class);
+               buffer.registerCheckpointEventHandler(toNotify);
+
+               long startTs;
+
+               check(sequence[0], buffer.getNextNonBlocked());
+
+               // starting first checkpoint
+               startTs = System.nanoTime();
+               check(sequence[4], buffer.getNextNonBlocked());
+               check(sequence[8], buffer.getNextNonBlocked());
+
+               // finished first checkpoint
+               check(sequence[3], buffer.getNextNonBlocked());
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
+               validateAlignmentTime(startTs, buffer);
+
+               check(sequence[5], buffer.getNextNonBlocked());
+
+               // re-read the queued cancellation barriers
+               check(sequence[9], buffer.getNextNonBlocked());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+               assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+               check(sequence[10], buffer.getNextNonBlocked());
+               check(sequence[12], buffer.getNextNonBlocked());
+               check(sequence[13], buffer.getNextNonBlocked());
+               check(sequence[14], buffer.getNextNonBlocked());
+
+               check(sequence[16], buffer.getNextNonBlocked());
+               check(sequence[17], buffer.getNextNonBlocked());
+               check(sequence[18], buffer.getNextNonBlocked());
+
+               // no further alignment should have happened
+               assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+               // no further checkpoint (abort) notifications
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+
+               // all done
+               assertNull(buffer.getNextNonBlocked());
+               assertNull(buffer.getNextNonBlocked());
+
+               buffer.cleanup();
+               checkNoTempFilesRemain();
+       }
+
+       /**
+        * This tests the where a replay of queued checkpoint barriers meets
+        * a canceled checkpoint.
+        *
+        * The replayed newer checkpoint barrier must not try to cancel the
+        * already canceled checkpoint.
+        */
+       @Test
+       public void testAbortWhileHavingQueuedBarriers() throws Exception {
+               BufferOrEvent[] sequence = {
+                               // starting a checkpoint
+                               /*  0 */ createBuffer(1),
+                               /*  1 */ createBarrier(1, 1),
+                               /*  2 */ createBuffer(2), createBuffer(0), 
createBuffer(1),
+
+                               // queued barrier and cancellation barrier
+                               /*  5 */ createBarrier(2, 1),
+
+                               // some queued buffers
+                               /*  6 */ createBuffer(2), createBuffer(1),
+
+                               // cancel the initial checkpoint
+                               /*  8 */ createCancellationBarrier(1, 0),
+
+                               // some more buffers
+                               /*  9 */ createBuffer(2), createBuffer(1), 
createBuffer(0),
+
+                               // ignored barrier - already canceled and moved 
to next checkpoint
+                               /* 12 */ createBarrier(1, 2),
+
+                               // some more buffers
+                               /* 13 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
+
+                               // complete next checkpoint regularly
+                               /* 16 */ createBarrier(2, 0), createBarrier(2, 
2),
+
+                               // some more buffers
+                               /* 18 */ createBuffer(0), createBuffer(1), 
createBuffer(2)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+               StatefulTask toNotify = mock(StatefulTask.class);
+               buffer.registerCheckpointEventHandler(toNotify);
+
+               long startTs;
+
+               check(sequence[0], buffer.getNextNonBlocked());
+
+               // starting first checkpoint
+               startTs = System.nanoTime();
+               check(sequence[2], buffer.getNextNonBlocked());
+               check(sequence[3], buffer.getNextNonBlocked());
+               check(sequence[6], buffer.getNextNonBlocked());
+
+               // cancelled by cancellation barrier
+               check(sequence[4], buffer.getNextNonBlocked());
+               validateAlignmentTime(startTs, buffer);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(1L);
+
+               // the next checkpoint alignment starts now
+               startTs = System.nanoTime();
+               check(sequence[9], buffer.getNextNonBlocked());
+               check(sequence[11], buffer.getNextNonBlocked());
+               check(sequence[13], buffer.getNextNonBlocked());
+               check(sequence[15], buffer.getNextNonBlocked());
+
+               // checkpoint done
+               check(sequence[7], buffer.getNextNonBlocked());
+               validateAlignmentTime(startTs, buffer);
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
+
+               // queued data
+               check(sequence[10], buffer.getNextNonBlocked());
+               check(sequence[14], buffer.getNextNonBlocked());
+
+               // trailing data
+               check(sequence[18], buffer.getNextNonBlocked());
+               check(sequence[19], buffer.getNextNonBlocked());
+               check(sequence[20], buffer.getNextNonBlocked());
+
+               // all done
+               assertNull(buffer.getNextNonBlocked());
+               assertNull(buffer.getNextNonBlocked());
+
+               buffer.cleanup();
+               checkNoTempFilesRemain();
+
+               // check overall notifications
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+       }
+
+       /**
+        * This tests the where a cancellation barrier is received for a 
checkpoint already
+        * canceled due to receiving a newer checkpoint barrier.
+        */
+       @Test
+       public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws 
Exception {
+               BufferOrEvent[] sequence = {
+                               // starting a checkpoint
+                               /*  0 */ createBuffer(2),
+                               /*  1 */ createBarrier(3, 1), createBarrier(3, 
0),
+                               /*  3 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
+
+                               // newer checkpoint barrier cancels/subsumes 
pending checkpoint
+                               /*  6 */ createBarrier(5, 2),
+
+                               // some queued buffers
+                               /*  7 */ createBuffer(2), createBuffer(1), 
createBuffer(0),
+
+                               // cancel barrier the initial checkpoint /it is 
already canceled)
+                               /* 10 */ createCancellationBarrier(3, 2),
+
+                               // some more buffers
+                               /* 11 */ createBuffer(2), createBuffer(0), 
createBuffer(1),
+
+                               // complete next checkpoint regularly
+                               /* 14 */ createBarrier(5, 0), createBarrier(5, 
1),
+
+                               // some more buffers
+                               /* 16 */ createBuffer(0), createBuffer(1), 
createBuffer(2)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+               StatefulTask toNotify = mock(StatefulTask.class);
+               buffer.registerCheckpointEventHandler(toNotify);
+
+               long startTs;
+
+               // validate the sequence
+
+               check(sequence[0], buffer.getNextNonBlocked());
+
+               // beginning of first checkpoint
+               check(sequence[5], buffer.getNextNonBlocked());
+
+               // future barrier aborts checkpoint
+               startTs = System.nanoTime();
+               check(sequence[3], buffer.getNextNonBlocked());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(3L);
+               check(sequence[4], buffer.getNextNonBlocked());
+
+               // alignment of next checkpoint
+               check(sequence[8], buffer.getNextNonBlocked());
+               check(sequence[9], buffer.getNextNonBlocked());
+               check(sequence[12], buffer.getNextNonBlocked());
+               check(sequence[13], buffer.getNextNonBlocked());
+
+               // checkpoint finished
+               check(sequence[7], buffer.getNextNonBlocked());
+               validateAlignmentTime(startTs, buffer);
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
+               check(sequence[11], buffer.getNextNonBlocked());
+
+               // remaining data
+               check(sequence[16], buffer.getNextNonBlocked());
+               check(sequence[17], buffer.getNextNonBlocked());
+               check(sequence[18], buffer.getNextNonBlocked());
+
+               // all done
+               assertNull(buffer.getNextNonBlocked());
+               assertNull(buffer.getNextNonBlocked());
+
+               buffer.cleanup();
+               checkNoTempFilesRemain();
+
+               // check overall notifications
+               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
        }
 
        // 
------------------------------------------------------------------------
        //  Utils
        // 
------------------------------------------------------------------------
 
-       private static BufferOrEvent createBarrier(long id, int channel) {
-               return new BufferOrEvent(new CheckpointBarrier(id, 
System.currentTimeMillis()), channel);
+       private static BufferOrEvent createBarrier(long checkpointId, int 
channel) {
+               return new BufferOrEvent(new CheckpointBarrier(checkpointId, 
System.currentTimeMillis()), channel);
+       }
+
+       private static BufferOrEvent createCancellationBarrier(long 
checkpointId, int channel) {
+               return new BufferOrEvent(new 
CancelCheckpointMarker(checkpointId), channel);
        }
 
        private static BufferOrEvent createBuffer(int channel) {
-               // since we have no access to the contents, we need to use the 
size as an
-               // identifier to validate correctness here
-               Buffer buf = new Buffer(
-                               
MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE),
-                               FreeingBufferRecycler.INSTANCE);
-               
-               buf.setSize(SIZE_COUNTER++);
+               final int size = SIZE_COUNTER++;
+               byte[] bytes = new byte[size];
+               RND.nextBytes(bytes);
+
+               MemorySegment memory = 
MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
+               memory.put(0, bytes);
+
+               Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE);
+               buf.setSize(size);
+
+               // retain an additional time so it does not get disposed after 
being read by the input gate
+               buf.retain();
+
                return new BufferOrEvent(buf, channel);
        }
 
@@ -932,15 +1413,16 @@ public class BarrierBufferTest {
                assertEquals(expected.isBuffer(), present.isBuffer());
                
                if (expected.isBuffer()) {
-                       // since we have no access to the contents, we need to 
use the size as an
-                       // identifier to validate correctness here
                        assertEquals(expected.getBuffer().getSize(), 
present.getBuffer().getSize());
+                       MemorySegment expectedMem = 
expected.getBuffer().getMemorySegment();
+                       MemorySegment presentMem = 
present.getBuffer().getMemorySegment();
+                       assertTrue("memory contents differs", 
expectedMem.compare(presentMem, 0, 0, PAGE_SIZE) == 0);
                }
                else {
                        assertEquals(expected.getEvent(), present.getEvent());
                }
        }
-       
+
        private static void checkNoTempFilesRemain() {
                // validate that all temp files have been removed
                for (File dir : IO_MANAGER.getSpillingDirectories()) {
@@ -985,8 +1467,10 @@ public class BarrierBufferTest {
 
                @Override
                public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData) throws Exception {
+                       assertTrue("wrong checkpoint id",
+                                       nextExpectedCheckpointId == -1L || 
+                                       nextExpectedCheckpointId == 
checkpointMetaData.getCheckpointId());
 
-                       assertTrue("wrong checkpoint id", 
nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == 
checkpointMetaData.getCheckpointId());
                        assertTrue(checkpointMetaData.getTimestamp() > 0);
                        
assertTrue(checkpointMetaData.getBytesBufferedInAlignment() >= 0);
                        
assertTrue(checkpointMetaData.getAlignmentDurationNanos() >= 0);
@@ -995,8 +1479,32 @@ public class BarrierBufferTest {
                }
 
                @Override
+               public void abortCheckpointOnBarrier(long checkpointId) {}
+
+               @Override
                public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
                        throw new UnsupportedOperationException("should never 
be called");
                }
        }
+
+       private static class CheckpointMatcher extends 
BaseMatcher<CheckpointMetaData> {
+
+               private final long checkpointId;
+
+               CheckpointMatcher(long checkpointId) {
+                       this.checkpointId = checkpointId;
+               }
+
+               @Override
+               public boolean matches(Object o) {
+                       return o != null &&
+                                       o.getClass() == 
CheckpointMetaData.class &&
+                                       ((CheckpointMetaData) 
o).getCheckpointId() == checkpointId;
+               }
+
+               @Override
+               public void describeTo(Description description) {
+                       description.appendText("CheckpointMetaData - id = " + 
checkpointId);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a79dd5f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 7cfbb66..fa3363e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -20,21 +20,17 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
+
 import org.junit.Test;
 
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -45,9 +41,9 @@ import static org.junit.Assert.fail;
  * Tests for the behavior of the barrier tracker.
  */
 public class BarrierTrackerTest {
-       
+
        private static final int PAGE_SIZE = 512;
-       
+
        @Test
        public void testSingleChannelNoBarriers() {
                try {
@@ -339,6 +335,98 @@ public class BarrierTrackerTest {
                }
        }
 
+       @Test
+       public void testSingleChannelAbortCheckpoint() throws Exception {
+               BufferOrEvent[] sequence = {
+                               createBuffer(0),
+                               createBarrier(1, 0),
+                               createBuffer(0),
+                               createBarrier(2, 0),
+                               createCancellationBarrier(4, 0),
+                               createBarrier(5, 0),
+                               createBuffer(0),
+                               createCancellationBarrier(6, 0),
+                               createBuffer(0)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, 
Arrays.asList(sequence));
+               BarrierTracker tracker = new BarrierTracker(gate);
+
+               // negative values mean an expected cancellation call!
+               CheckpointSequenceValidator validator =
+                               new CheckpointSequenceValidator(1, 2, -4, 5, 
-6);
+               tracker.registerCheckpointEventHandler(validator);
+
+               for (BufferOrEvent boe : sequence) {
+                       if (boe.isBuffer()) {
+                               assertEquals(boe, tracker.getNextNonBlocked());
+                       }
+                       assertTrue(tracker.isEmpty());
+               }
+
+               assertNull(tracker.getNextNonBlocked());
+               assertNull(tracker.getNextNonBlocked());
+       }
+
+       @Test
+       public void testMultiChannelAbortCheckpoint() throws Exception {
+               BufferOrEvent[] sequence = {
+                               // some buffers and a successful checkpoint
+                               createBuffer(0), createBuffer(2), 
createBuffer(0),
+                               createBarrier(1, 1), createBarrier(1, 2),
+                               createBuffer(2), createBuffer(1),
+                               createBarrier(1, 0),
+
+                               // aborted on last barrier
+                               createBuffer(0), createBuffer(2),
+                               createBarrier(2, 0), createBarrier(2, 2),
+                               createBuffer(0), createBuffer(2),
+                               createCancellationBarrier(2, 1),
+
+                               // successful checkpoint
+                               createBuffer(2), createBuffer(1),
+                               createBarrier(3, 1), createBarrier(3, 2), 
createBarrier(3, 0),
+
+                               // abort on first barrier
+                               createBuffer(0), createBuffer(1),
+                               createCancellationBarrier(4, 1), 
createBarrier(4, 2),
+                               createBuffer(0),
+                               createBarrier(4, 0),
+
+                               // another successful checkpoint
+                               createBuffer(0), createBuffer(1), 
createBuffer(2),
+                               createBarrier(5, 2), createBarrier(5, 1), 
createBarrier(5, 0),
+
+                               // abort multiple cancellations and a barrier 
after the cancellations 
+                               createBuffer(0), createBuffer(1),
+                               createCancellationBarrier(6, 1), 
createCancellationBarrier(6, 2),
+                               createBarrier(6, 0),
+
+                               createBuffer(0)
+               };
+
+               MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
+               BarrierTracker tracker = new BarrierTracker(gate);
+
+               // negative values mean an expected cancellation call!
+               CheckpointSequenceValidator validator =
+                               new CheckpointSequenceValidator(1, -2, 3, -4, 
5, -6);
+               tracker.registerCheckpointEventHandler(validator);
+
+               for (BufferOrEvent boe : sequence) {
+                       if (boe.isBuffer()) {
+                               assertEquals(boe, tracker.getNextNonBlocked());
+                       }
+               }
+
+               assertTrue(tracker.isEmpty());
+
+               assertNull(tracker.getNextNonBlocked());
+               assertNull(tracker.getNextNonBlocked());
+
+               assertTrue(tracker.isEmpty());
+       }
+       
        // 
------------------------------------------------------------------------
        //  Utils
        // 
------------------------------------------------------------------------
@@ -347,6 +435,10 @@ public class BarrierTrackerTest {
                return new BufferOrEvent(new CheckpointBarrier(id, 
System.currentTimeMillis()), channel);
        }
 
+       private static BufferOrEvent createCancellationBarrier(long id, int 
channel) {
+               return new BufferOrEvent(new CancelCheckpointMarker(id), 
channel);
+       }
+
        private static BufferOrEvent createBuffer(int channel) {
                return new BufferOrEvent(
                                new Buffer(MemorySegmentFactory.wrap(new 
byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), channel);
@@ -368,7 +460,6 @@ public class BarrierTrackerTest {
 
                @Override
                public void setInitialState(TaskStateHandles taskStateHandles) 
throws Exception {
-
                        throw new UnsupportedOperationException("should never 
be called");
                }
 
@@ -379,10 +470,27 @@ public class BarrierTrackerTest {
 
                @Override
                public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData) throws Exception {
+                       assertTrue("More checkpoints than expected", i < 
checkpointIDs.length);
+
+                       final long expectedId = checkpointIDs[i++];
+                       if (expectedId >= 0) {
+                               assertEquals("wrong checkpoint id", expectedId, 
checkpointMetaData.getCheckpointId());
+                               assertTrue(checkpointMetaData.getTimestamp() > 
0);
+                       } else {
+                               fail("got 'triggerCheckpointOnBarrier()' when 
expecting an 'abortCheckpointOnBarrier()'");
+                       }
+               }
 
+               @Override
+               public void abortCheckpointOnBarrier(long checkpointId) {
                        assertTrue("More checkpoints than expected", i < 
checkpointIDs.length);
-                       assertEquals("wrong checkpoint id", checkpointIDs[i++], 
checkpointMetaData.getCheckpointId());
-                       assertTrue(checkpointMetaData.getTimestamp() > 0);
+
+                       final long expectedId = checkpointIDs[i++];
+                       if (expectedId < 0) {
+                               assertEquals("wrong checkpoint id for checkoint 
abort", -expectedId, checkpointId);
+                       } else {
+                               fail("got 'abortCheckpointOnBarrier()' when 
expecting an 'triggerCheckpointOnBarrier()'");
+                       }
                }
 
                @Override

Reply via email to