[FLINK-4984] [checkpointing] Add Cancellation Barriers as a way to signal 
aborted checkpoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1f028de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1f028de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1f028de

Branch: refs/heads/release-1.1
Commit: a1f028dee49928ada014632bb27216b36e30250e
Parents: 4dd3efe
Author: Stephan Ewen <[email protected]>
Authored: Sun Oct 23 18:41:32 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Tue Nov 8 18:27:47 2016 +0100

----------------------------------------------------------------------
 .../io/network/api/CancelCheckpointMarker.java  |  77 +++
 .../api/serialization/EventSerializer.java      |  57 +-
 .../runtime/io/network/netty/NettyMessage.java  |   2 +-
 .../partition/PipelinedSubpartition.java        |   3 +-
 .../runtime/jobgraph/tasks/StatefulTask.java    |  21 +
 .../api/serialization/EventSerializerTest.java  |  45 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  10 +
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  10 +
 .../streaming/runtime/io/BarrierBuffer.java     | 265 ++++++--
 .../streaming/runtime/io/BarrierTracker.java    | 164 +++--
 .../streaming/runtime/io/BufferSpiller.java     |   2 +-
 .../runtime/io/CheckpointBarrierHandler.java    |  22 +-
 .../runtime/io/StreamInputProcessor.java        |   5 +-
 .../runtime/io/StreamTwoInputProcessor.java     |   5 +-
 .../runtime/tasks/OneInputStreamTask.java       |   2 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  32 +-
 .../streaming/runtime/tasks/StreamTask.java     |  48 +-
 .../runtime/tasks/TwoInputStreamTask.java       |   2 +-
 .../streaming/runtime/io/BarrierBufferTest.java | 617 +++++++++++++++++--
 .../runtime/io/BarrierTrackerTest.java          | 157 ++++-
 20 files changed, 1291 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
new file mode 100644
index 0000000..52a2517
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * The CancelCheckpointMarker travels through the data streams, similar to the 
{@link CheckpointBarrier},
+ * but signals that a certain checkpoint should be canceled. Any in-progress 
alignment for that
+ * checkpoint needs to be canceled and regular processing should be resumed.
+ */
+public class CancelCheckpointMarker extends RuntimeEvent {
+
+       /** The id of the checkpoint to be canceled */
+       private final long checkpointId;
+
+       public CancelCheckpointMarker(long checkpointId) {
+               this.checkpointId = checkpointId;
+       }
+
+       public long getCheckpointId() {
+               return checkpointId;
+       }
+
+       // 
------------------------------------------------------------------------
+       // These known and common event go through special code paths, rather 
than
+       // through generic serialization 
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               throw new UnsupportedOperationException("this method should 
never be called");
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               throw new UnsupportedOperationException("this method should 
never be called");
+       }
+       
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               return (int) (checkpointId ^ (checkpointId >>> 32));
+       }
+
+       @Override
+       public boolean equals(Object other) {
+               return other != null && 
+                               other.getClass() == 
CancelCheckpointMarker.class &&
+                               this.checkpointId == ((CancelCheckpointMarker) 
other).checkpointId;
+       }
+
+       @Override
+       public String toString() {
+               return "CancelCheckpointMarker " + checkpointId;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index a34f8cf..0bc3b28 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -38,7 +39,7 @@ import java.nio.ByteOrder;
  * Utility class to serialize and deserialize task events.
  */
 public class EventSerializer {
-       
+
        private static final int END_OF_PARTITION_EVENT = 0;
 
        private static final int CHECKPOINT_BARRIER_EVENT = 1;
@@ -46,17 +47,19 @@ public class EventSerializer {
        private static final int END_OF_SUPERSTEP_EVENT = 2;
 
        private static final int OTHER_EVENT = 3;
-       
+
+       private static final int CANCEL_CHECKPOINT_MARKER_EVENT = 4;
+
        // 
------------------------------------------------------------------------
-       
-       public static ByteBuffer toSerializedEvent(AbstractEvent event) {
+
+       public static ByteBuffer toSerializedEvent(AbstractEvent event) throws 
IOException {
                final Class<?> eventClass = event.getClass();
                if (eventClass == EndOfPartitionEvent.class) {
                        return ByteBuffer.wrap(new byte[] { 0, 0, 0, 
END_OF_PARTITION_EVENT });
                }
                else if (eventClass == CheckpointBarrier.class) {
                        CheckpointBarrier barrier = (CheckpointBarrier) event;
-                       
+
                        ByteBuffer buf = ByteBuffer.allocate(20);
                        buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
                        buf.putLong(4, barrier.getId());
@@ -66,32 +69,39 @@ public class EventSerializer {
                else if (eventClass == EndOfSuperstepEvent.class) {
                        return ByteBuffer.wrap(new byte[] { 0, 0, 0, 
END_OF_SUPERSTEP_EVENT });
                }
+               else if (eventClass == CancelCheckpointMarker.class) {
+                       CancelCheckpointMarker marker = 
(CancelCheckpointMarker) event;
+
+                       ByteBuffer buf = ByteBuffer.allocate(12);
+                       buf.putInt(0, CANCEL_CHECKPOINT_MARKER_EVENT);
+                       buf.putLong(4, marker.getCheckpointId());
+                       return buf;
+               }
                else {
                        try {
                                final DataOutputSerializer serializer = new 
DataOutputSerializer(128);
                                serializer.writeInt(OTHER_EVENT);
                                serializer.writeUTF(event.getClass().getName());
                                event.write(serializer);
-       
                                return serializer.wrapAsByteBuffer();
                        }
                        catch (IOException e) {
-                               throw new RuntimeException("Error while 
serializing event.", e);
+                               throw new IOException("Error while serializing 
event.", e);
                        }
                }
        }
 
-       public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, 
ClassLoader classLoader) {
+       public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, 
ClassLoader classLoader) throws IOException {
                if (buffer.remaining() < 4) {
-                       throw new RuntimeException("Incomplete event");
+                       throw new IOException("Incomplete event");
                }
-               
+
                final ByteOrder bufferOrder = buffer.order();
                buffer.order(ByteOrder.BIG_ENDIAN);
-               
+
                try {
                        int type = buffer.getInt();
-                               
+
                        if (type == END_OF_PARTITION_EVENT) {
                                return EndOfPartitionEvent.INSTANCE;
                        }
@@ -103,35 +113,38 @@ public class EventSerializer {
                        else if (type == END_OF_SUPERSTEP_EVENT) {
                                return EndOfSuperstepEvent.INSTANCE;
                        }
+                       else if (type == CANCEL_CHECKPOINT_MARKER_EVENT) {
+                               long id = buffer.getLong();
+                               return new CancelCheckpointMarker(id);
+                       }
                        else if (type == OTHER_EVENT) {
                                try {
                                        final DataInputDeserializer 
deserializer = new DataInputDeserializer(buffer);
-               
                                        final String className = 
deserializer.readUTF();
-               
+
                                        final Class<? extends AbstractEvent> 
clazz;
                                        try {
                                                clazz = 
classLoader.loadClass(className).asSubclass(AbstractEvent.class);
                                        }
                                        catch (ClassNotFoundException e) {
-                                               throw new 
RuntimeException("Could not load event class '" + className + "'.", e);
+                                               throw new IOException("Could 
not load event class '" + className + "'.", e);
                                        }
                                        catch (ClassCastException e) {
-                                               throw new RuntimeException("The 
class '" + className + "' is not a valid subclass of '"
+                                               throw new IOException("The 
class '" + className + "' is not a valid subclass of '"
                                                                + 
AbstractEvent.class.getName() + "'.", e);
                                        }
-               
+
                                        final AbstractEvent event = 
InstantiationUtil.instantiate(clazz, AbstractEvent.class);
                                        event.read(deserializer);
-               
+
                                        return event;
                                }
                                catch (Exception e) {
-                                       throw new RuntimeException("Error while 
deserializing or instantiating event.", e);
+                                       throw new IOException("Error while 
deserializing or instantiating event.", e);
                                }
                        } 
                        else {
-                               throw new RuntimeException("Corrupt byte stream 
for event");
+                               throw new IOException("Corrupt byte stream for 
event");
                        }
                }
                finally {
@@ -143,7 +156,7 @@ public class EventSerializer {
        // Buffer helpers
        // 
------------------------------------------------------------------------
 
-       public static Buffer toBuffer(AbstractEvent event) {
+       public static Buffer toBuffer(AbstractEvent event) throws IOException {
                final ByteBuffer serializedEvent = 
EventSerializer.toSerializedEvent(event);
 
                MemorySegment data = 
MemorySegmentFactory.wrap(serializedEvent.array());
@@ -154,7 +167,7 @@ public class EventSerializer {
                return buffer;
        }
 
-       public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader 
classLoader) {
+       public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader 
classLoader) throws IOException {
                return fromSerializedEvent(buffer.getNioBuffer(), classLoader);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 3a24181..6f6001b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -463,7 +463,7 @@ abstract class NettyMessage {
                }
 
                @Override
-               public void readFrom(ByteBuf buffer) {
+               public void readFrom(ByteBuf buffer) throws IOException {
                        // TODO Directly deserialize fromNetty's buffer
                        int length = buffer.readInt();
                        ByteBuffer serializedEvent = 
ByteBuffer.allocate(length);

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 2d7097d..b703acb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.util.event.NotificationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayDeque;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -88,7 +89,7 @@ class PipelinedSubpartition extends ResultSubpartition {
        }
 
        @Override
-       public void finish() {
+       public void finish() throws IOException {
                final NotificationListener listener;
 
                synchronized (buffers) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index f8bba1a..7c581df 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -47,6 +47,27 @@ public interface StatefulTask<T extends StateHandle<?>> {
         */
        boolean triggerCheckpoint(long checkpointId, long timestamp) throws 
Exception;
 
+       /**
+        * This method is called when a checkpoint is triggered as a result of 
receiving checkpoint
+        * barriers on all input streams.
+        *
+        * @param checkpointId The ID of the checkpoint, incrementing.
+        * @param timestamp The timestamp when the checkpoint was triggered at 
the JobManager.
+        *
+        * @throws Exception Exceptions thrown as the result of triggering a 
checkpoint are forwarded.
+        */
+       void triggerCheckpointOnBarrier(long checkpointId, long timestamp) 
throws Exception;
+
+       /**
+        * Aborts a checkpoint as the result of receiving possibly some 
checkpoint barriers,
+        * but at least one {@link 
org.apache.flink.runtime.io.network.api.CancelCheckpointMarker}.
+        * 
+        * <p>This requires implementing tasks to forward a
+        * {@link 
org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their 
outputs.
+        * 
+        * @param checkpointId The ID of the checkpoint to be aborted.
+        */
+       void abortCheckpointOnBarrier(long checkpointId) throws Exception;
 
        /**
         * Invoked when a checkpoint has been completed, i.e., when the 
checkpoint coordinator has received

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index ddfd758..d47a0b5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -28,34 +29,30 @@ import org.junit.Test;
 
 import java.nio.ByteBuffer;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class EventSerializerTest {
 
        @Test
-       public void testSerializeDeserializeEvent() {
-               try {
-                       AbstractEvent[] events = {
-                                       EndOfPartitionEvent.INSTANCE,
-                                       EndOfSuperstepEvent.INSTANCE,
-                                       new CheckpointBarrier(1678L, 4623784L),
-                                       new TestTaskEvent(Math.random(), 
12361231273L)
-                       };
-                       
-                       for (AbstractEvent evt : events) {
-                               ByteBuffer serializedEvent = 
EventSerializer.toSerializedEvent(evt);
-                               assertTrue(serializedEvent.hasRemaining());
-
-                               AbstractEvent deserialized = 
-                                               
EventSerializer.fromSerializedEvent(serializedEvent, 
getClass().getClassLoader());
-                               assertNotNull(deserialized);
-                               assertEquals(evt, deserialized);
-                       }
-                       
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
+       public void testSerializeDeserializeEvent() throws Exception {
+               AbstractEvent[] events = {
+                               EndOfPartitionEvent.INSTANCE,
+                               EndOfSuperstepEvent.INSTANCE,
+                               new CheckpointBarrier(1678L, 4623784L),
+                               new TestTaskEvent(Math.random(), 12361231273L),
+                               new CancelCheckpointMarker(287087987329842L)
+               };
+               
+               for (AbstractEvent evt : events) {
+                       ByteBuffer serializedEvent = 
EventSerializer.toSerializedEvent(evt);
+                       assertTrue(serializedEvent.hasRemaining());
+
+                       AbstractEvent deserialized = 
+                                       
EventSerializer.fromSerializedEvent(serializedEvent, 
getClass().getClassLoader());
+                       assertNotNull(deserialized);
+                       assertEquals(evt, deserialized);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index f050e29..4dfaf95 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -450,6 +450,16 @@ public class JobManagerHARecoveryTest {
                }
 
                @Override
+               public void triggerCheckpointOnBarrier(long checkpointId, long 
timestamp) throws Exception {
+                       throw new UnsupportedOperationException("should not be 
called!");
+               }
+
+               @Override
+               public void abortCheckpointOnBarrier(long checkpointId) {
+                       throw new UnsupportedOperationException("should not be 
called!");
+               }
+
+               @Override
                public void notifyCheckpointComplete(long checkpointId) {
                        if (completedCheckpoints++ > 
NUM_CHECKPOINTS_TO_COMPLETE) {
                                completedCheckpointsLatch.countDown();

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 0c0d064..5b344eb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -221,6 +221,16 @@ public class TaskAsyncCallTest {
                }
 
                @Override
+               public void triggerCheckpointOnBarrier(long checkpointId, long 
timestamp) throws Exception {
+                       throw new UnsupportedOperationException("Should not be 
called");
+               }
+
+               @Override
+               public void abortCheckpointOnBarrier(long checkpointId) {
+                       throw new UnsupportedOperationException("Should not be 
called");
+               }
+
+               @Override
                public void notifyCheckpointComplete(long checkpointId) {
                        if (checkpointId != lastCheckpointId && this.error == 
null) {
                                this.error = new Exception("calls out of 
order");

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index dcd76c6..36de717 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -17,42 +17,43 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import java.io.IOException;
-import java.util.ArrayDeque;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayDeque;
+
 /**
  * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs 
with barriers until
  * all inputs have received the barrier for a given checkpoint.
  * 
  * <p>To avoid back-pressuring the input streams (which may cause distributed 
deadlocks), the
  * BarrierBuffer continues receiving buffers from the blocked channels and 
stores them internally until 
- * the blocks are released.</p>
+ * the blocks are released.
  */
 @Internal
 public class BarrierBuffer implements CheckpointBarrierHandler {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(BarrierBuffer.class);
-       
+
        /** The gate that the buffer draws its input from */
        private final InputGate inputGate;
 
        /** Flags that indicate whether a channel is currently blocked/buffered 
*/
        private final boolean[] blockedChannels;
-       
+
        /** The total number of channels that this buffer handles data from */
        private final int totalNumberOfInputChannels;
-       
+
        /** To utility to write blocked data to a file channel */
        private final BufferSpiller bufferSpiller;
 
@@ -65,17 +66,24 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
 
        /** Handler that receives the checkpoint notifications */
-       private EventListener<CheckpointBarrier> checkpointHandler;
+       private StatefulTask<?> toNotifyOnCheckpoint;
 
        /** The ID of the checkpoint for which we expect barriers */
        private long currentCheckpointId = -1L;
 
-       /** The number of received barriers (= number of blocked/buffered 
channels) */
+       /** The number of received barriers (= number of blocked/buffered 
channels)
+        * IMPORTANT: A canceled checkpoint must always have 0 barriers */
        private int numBarriersReceived;
-       
+
        /** The number of already closed channels */
        private int numClosedChannels;
-       
+
+       /** The timestamp as in {@link System#nanoTime()} at which the last 
alignment started */
+       private long startOfAlignmentTimestamp;
+
+       /** The time (in nanoseconds) that the latest alignment took */
+       private long latestAlignmentDurationNanos;
+
        /** Flag to indicate whether we have drawn all available input */
        private boolean endOfStream;
 
@@ -90,7 +98,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                this.inputGate = inputGate;
                this.totalNumberOfInputChannels = 
inputGate.getNumberOfInputChannels();
                this.blockedChannels = new 
boolean[this.totalNumberOfInputChannels];
-               
+
                this.bufferSpiller = new BufferSpiller(ioManager, 
inputGate.getPageSize());
                this.queuedBuffered = new 
ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>();
        }
@@ -100,7 +108,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        // 
------------------------------------------------------------------------
 
        @Override
-       public BufferOrEvent getNextNonBlocked() throws IOException, 
InterruptedException {
+       public BufferOrEvent getNextNonBlocked() throws Exception {
                while (true) {
                        // process buffered BufferOrEvents before grabbing new 
ones
                        BufferOrEvent next;
@@ -114,7 +122,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                        return getNextNonBlocked();
                                }
                        }
-                       
+
                        if (next != null) {
                                if (isBlocked(next.getChannelIndex())) {
                                        // if the channel is blocked we, we 
just store the BufferOrEvent
@@ -129,27 +137,29 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                                
processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
                                        }
                                }
+                               else if (next.getEvent().getClass() == 
CancelCheckpointMarker.class) {
+                                       
processCancellationBarrier((CancelCheckpointMarker) next.getEvent());
+                               }
                                else {
                                        if (next.getEvent().getClass() == 
EndOfPartitionEvent.class) {
-                                               numClosedChannels++;
-                                               // no chance to complete this 
checkpoint
-                                               releaseBlocks();
+                                               
processEndOfPartition(next.getChannelIndex());
                                        }
                                        return next;
                                }
                        }
                        else if (!endOfStream) {
-                               // end of stream. we feed the data that is 
still buffered
+                               // end of input stream. stream continues with 
the buffered data
                                endOfStream = true;
-                               releaseBlocks();
+                               releaseBlocksAndResetBarriers();
                                return getNextNonBlocked();
                        }
                        else {
+                               // final end of both input and buffered data
                                return null;
                        }
                }
        }
-       
+
        private void completeBufferedSequence() throws IOException {
                currentBuffered.cleanup();
                currentBuffered = queuedBuffered.pollFirst();
@@ -157,66 +167,175 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        currentBuffered.open();
                }
        }
-       
-       private void processBarrier(CheckpointBarrier receivedBarrier, int 
channelIndex) throws IOException {
+
+       private void processBarrier(CheckpointBarrier receivedBarrier, int 
channelIndex) throws Exception {
                final long barrierId = receivedBarrier.getId();
 
+               // fast path for single channel cases
+               if (totalNumberOfInputChannels == 1) {
+                       if (barrierId > currentCheckpointId) {
+                               // new checkpoint
+                               currentCheckpointId = barrierId;
+                               notifyCheckpoint(receivedBarrier);
+                       }
+                       return;
+               }
+
+               // -- general code path for multiple input channels --
+
                if (numBarriersReceived > 0) {
-                       // subsequent barrier of a checkpoint.
+                       // this is only true if some alignment is already 
progress and was not canceled
+
                        if (barrierId == currentCheckpointId) {
                                // regular case
                                onBarrier(channelIndex);
                        }
                        else if (barrierId > currentCheckpointId) {
-                               // we did not complete the current checkpoint
+                               // we did not complete the current checkpoint, 
another started before
                                LOG.warn("Received checkpoint barrier for 
checkpoint {} before completing current checkpoint {}. " +
                                                "Skipping current checkpoint.", 
barrierId, currentCheckpointId);
 
-                               releaseBlocks();
-                               currentCheckpointId = barrierId;
-                               onBarrier(channelIndex);
+                               // let the task know we are not completing this
+                               notifyAbort(currentCheckpointId);
+
+                               // abort the current checkpoint
+                               releaseBlocksAndResetBarriers();
+
+                               // begin a the new checkpoint
+                               beginNewAlignment(barrierId, channelIndex);
                        }
                        else {
-                               // ignore trailing barrier from aborted 
checkpoint
+                               // ignore trailing barrier from an earlier 
checkpoint (obsolete now)
                                return;
                        }
-                       
                }
                else if (barrierId > currentCheckpointId) {
                        // first barrier of a new checkpoint
-                       currentCheckpointId = barrierId;
-                       onBarrier(channelIndex);
+                       beginNewAlignment(barrierId, channelIndex);
                }
                else {
-                       // trailing barrier from previous (skipped) checkpoint
+                       // either the current checkpoint was canceled 
(numBarriers == 0) or
+                       // this barrier is from an old subsumed checkpoint
                        return;
                }
 
-               // check if we have all barriers
+               // check if we have all barriers - since canceled checkpoints 
always have zero barriers
+               // this can only happen on a non canceled checkpoint
                if (numBarriersReceived + numClosedChannels == 
totalNumberOfInputChannels) {
+                       // actually trigger checkpoint
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("Received all barrier, triggering 
checkpoint {} at {}",
+                               LOG.debug("Received all barriers, triggering 
checkpoint {} at {}",
                                                receivedBarrier.getId(), 
receivedBarrier.getTimestamp());
                        }
 
-                       if (checkpointHandler != null) {
-                               checkpointHandler.onEvent(receivedBarrier);
+                       releaseBlocksAndResetBarriers();
+                       notifyCheckpoint(receivedBarrier);
+               }
+       }
+
+       private void processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws Exception {
+               final long barrierId = cancelBarrier.getCheckpointId();
+
+               // fast path for single channel cases
+               if (totalNumberOfInputChannels == 1) {
+                       if (barrierId > currentCheckpointId) {
+                               // new checkpoint
+                               currentCheckpointId = barrierId;
+                               notifyAbort(barrierId);
                        }
-                       
-                       releaseBlocks();
+                       return;
+               }
+
+               // -- general code path for multiple input channels --
+
+               if (numBarriersReceived > 0) {
+                       // this is only true if some alignment is in progress 
and nothing was canceled
+
+                       if (barrierId == currentCheckpointId) {
+                               // cancel this alignment
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Checkpoint {} canceled, 
aborting alignment", barrierId);
+                               }
+
+                               releaseBlocksAndResetBarriers();
+                               notifyAbort(barrierId);
+                       }
+                       else if (barrierId > currentCheckpointId) {
+                               // we canceled the next which also cancels the 
current
+                               LOG.warn("Received cancellation barrier for 
checkpoint {} before completing current checkpoint {}. " +
+                                               "Skipping current checkpoint.", 
barrierId, currentCheckpointId);
+
+                               // this stops the current alignment
+                               releaseBlocksAndResetBarriers();
+
+                               // the next checkpoint starts as canceled
+                               currentCheckpointId = barrierId;
+                               startOfAlignmentTimestamp = 0L;
+                               latestAlignmentDurationNanos = 0L;
+                               notifyAbort(barrierId);
+                       }
+
+                       // else: ignore trailing (cancellation) barrier from an 
earlier checkpoint (obsolete now)
+
+               }
+               else if (barrierId > currentCheckpointId) {
+                       // first barrier of a new checkpoint is directly a 
cancellation
+
+                       // by setting the currentCheckpointId to this 
checkpoint while keeping the numBarriers
+                       // at zero means that no checkpoint barrier can start a 
new alignment
+                       currentCheckpointId = barrierId;
+
+                       startOfAlignmentTimestamp = 0L;
+                       latestAlignmentDurationNanos = 0L;
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Checkpoint {} canceled, skipping 
alignment", barrierId);
+                       }
+
+                       notifyAbort(barrierId);
                }
+
+               // else: trailing barrier from either
+               //   - a previous (subsumed) checkpoint
+               //   - the current checkpoint if it was already canceled
        }
-       
+
+       private void processEndOfPartition(int channel) throws Exception {
+               numClosedChannels++;
+
+               if (numBarriersReceived > 0) {
+                       // let the task know we skip a checkpoint
+                       notifyAbort(currentCheckpointId);
+
+                       // no chance to complete this checkpoint
+                       releaseBlocksAndResetBarriers();
+               }
+       }
+
+       private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) 
throws Exception {
+               if (toNotifyOnCheckpoint != null) {
+                       toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+                                       checkpointBarrier.getId(), 
checkpointBarrier.getTimestamp());
+               }
+       }
+
+       private void notifyAbort(long checkpointId) throws Exception {
+               if (toNotifyOnCheckpoint != null) {
+                       
toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+               }
+       }
+
+
        @Override
-       public void 
registerCheckpointEventHandler(EventListener<CheckpointBarrier> 
checkpointHandler) {
-               if (this.checkpointHandler == null) {
-                       this.checkpointHandler = checkpointHandler;
+       public void registerCheckpointEventHandler(StatefulTask<?> 
toNotifyOnCheckpoint) {
+               if (this.toNotifyOnCheckpoint == null) {
+                       this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
                }
                else {
-                       throw new IllegalStateException("BarrierBuffer already 
has a registered checkpoint handler");
+                       throw new IllegalStateException("BarrierBuffer already 
has a registered checkpoint notifyee");
                }
        }
-       
+
        @Override
        public boolean isEmpty() {
                return currentBuffered == null;
@@ -231,8 +350,20 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                for (BufferSpiller.SpilledBufferOrEventSequence seq : 
queuedBuffered) {
                        seq.cleanup();
                }
+               queuedBuffered.clear();
        }
-       
+
+       private void beginNewAlignment(long checkpointId, int channelIndex) 
throws IOException {
+               currentCheckpointId = checkpointId;
+               onBarrier(channelIndex);
+
+               startOfAlignmentTimestamp = System.nanoTime();
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Starting stream alignment for checkpoint " + 
checkpointId);
+               }
+       }
+
        /**
         * Checks whether the channel with the given index is blocked.
         * 
@@ -242,7 +373,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        private boolean isBlocked(int channelIndex) {
                return blockedChannels[channelIndex];
        }
-       
+
        /**
         * Blocks the given channel index, from which a barrier has been 
received.
         * 
@@ -251,30 +382,28 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        private void onBarrier(int channelIndex) throws IOException {
                if (!blockedChannels[channelIndex]) {
                        blockedChannels[channelIndex] = true;
+
                        numBarriersReceived++;
-                       
+
                        if (LOG.isDebugEnabled()) {
                                LOG.debug("Received barrier from channel " + 
channelIndex);
                        }
                }
                else {
-                       throw new IOException("Stream corrupt: Repeated barrier 
for same checkpoint and input stream");
+                       throw new IOException("Stream corrupt: Repeated barrier 
for same checkpoint on input " + channelIndex);
                }
        }
 
        /**
-        * Releases the blocks on all channels. Makes sure the just written data
-        * is the next to be consumed.
+        * Releases the blocks on all channels and resets the barrier count.
+        * Makes sure the just written data is the next to be consumed.
         */
-       private void releaseBlocks() throws IOException {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Releasing blocks");
-               }
+       private void releaseBlocksAndResetBarriers() throws IOException {
+               LOG.debug("End of stream alignment, feeding buffered data 
back");
 
                for (int i = 0; i < blockedChannels.length; i++) {
                        blockedChannels[i] = false;
                }
-               numBarriersReceived = 0;
 
                if (currentBuffered == null) {
                        // common case: no more buffered data
@@ -295,10 +424,18 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                currentBuffered = bufferedNow;
                        }
                }
+
+               // the next barrier that comes must assume it is the first
+               numBarriersReceived = 0;
+
+               if (startOfAlignmentTimestamp > 0) {
+                       latestAlignmentDurationNanos = System.nanoTime() - 
startOfAlignmentTimestamp;
+                       startOfAlignmentTimestamp = 0;
+               }
        }
 
        // 
------------------------------------------------------------------------
-       // For Testing
+       //  Properties
        // 
------------------------------------------------------------------------
 
        /**
@@ -309,7 +446,17 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        public long getCurrentCheckpointId() {
                return this.currentCheckpointId;
        }
-       
+
+       @Override
+       public long getAlignmentDurationNanos() {
+               long start = this.startOfAlignmentTimestamp;
+               if (start <= 0) {
+                       return latestAlignmentDurationNanos;
+               } else {
+                       return System.nanoTime() - start;
+               }
+       }
+
        // 
------------------------------------------------------------------------
        // Utilities 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 9c9ec4f..5157336 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,12 +19,12 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
-import java.io.IOException;
 import java.util.ArrayDeque;
 
 /**
@@ -34,9 +34,9 @@ import java.util.ArrayDeque;
  * 
  * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the 
input
  * channels that have sent barriers, so it cannot be used to gain 
"exactly-once" processing
- * guarantees. It can, however, be used to gain "at least once" processing 
guarantees.</p>
+ * guarantees. It can, however, be used to gain "at least once" processing 
guarantees.
  * 
- * <p>NOTE: This implementation strictly assumes that newer checkpoints have 
higher checkpoint IDs.</p>
+ * <p>NOTE: This implementation strictly assumes that newer checkpoints have 
higher checkpoint IDs.
  */
 @Internal
 public class BarrierTracker implements CheckpointBarrierHandler {
@@ -57,11 +57,12 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
        private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
        
        /** The listener to be notified on complete checkpoints */
-       private EventListener<CheckpointBarrier> checkpointHandler;
+       private StatefulTask<?> toNotifyOnCheckpoint;
        
        /** The highest checkpoint ID encountered so far */
        private long latestPendingCheckpointID = -1;
-       
+
+       // 
------------------------------------------------------------------------
        
        public BarrierTracker(InputGate inputGate) {
                this.inputGate = inputGate;
@@ -70,28 +71,33 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
        }
 
        @Override
-       public BufferOrEvent getNextNonBlocked() throws IOException, 
InterruptedException {
+       public BufferOrEvent getNextNonBlocked() throws Exception {
                while (true) {
                        BufferOrEvent next = inputGate.getNextBufferOrEvent();
-                       if (next == null) {
-                               return null;
-                       }
-                       else if (next.isBuffer() || next.getEvent().getClass() 
!= CheckpointBarrier.class) {
+                       if (next == null || next.isBuffer()) {
+                               // buffer or input exhausted
                                return next;
                        }
-                       else {
+                       else if (next.getEvent().getClass() == 
CheckpointBarrier.class) {
                                processBarrier((CheckpointBarrier) 
next.getEvent());
                        }
+                       else if (next.getEvent().getClass() == 
CancelCheckpointMarker.class) {
+                               
processCheckpointAbortBarrier((CancelCheckpointMarker) next.getEvent());
+                       }
+                       else {
+                               // some other event
+                               return next;
+                       }
                }
        }
 
        @Override
-       public void 
registerCheckpointEventHandler(EventListener<CheckpointBarrier> 
checkpointHandler) {
-               if (this.checkpointHandler == null) {
-                       this.checkpointHandler = checkpointHandler;
+       public void registerCheckpointEventHandler(StatefulTask<?> 
toNotifyOnCheckpoint) {
+               if (this.toNotifyOnCheckpoint == null) {
+                       this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
                }
                else {
-                       throw new IllegalStateException("BarrierTracker already 
has a registered checkpoint handler");
+                       throw new IllegalStateException("BarrierTracker already 
has a registered checkpoint notifyee");
                }
        }
 
@@ -105,22 +111,27 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
                return pendingCheckpoints.isEmpty();
        }
 
-       private void processBarrier(CheckpointBarrier receivedBarrier) {
+       @Override
+       public long getAlignmentDurationNanos() {
+               // this one does not do alignment at all
+               return 0L;
+       }
+
+       private void processBarrier(CheckpointBarrier receivedBarrier) throws 
Exception {
+               final long barrierId = receivedBarrier.getId();
+
                // fast path for single channel trackers
                if (totalNumberOfInputChannels == 1) {
-                       if (checkpointHandler != null) {
-                               checkpointHandler.onEvent(receivedBarrier);
-                       }
+                       notifyCheckpoint(barrierId, 
receivedBarrier.getTimestamp());
                        return;
                }
-               
+
                // general path for multiple input channels
-               final long barrierId = receivedBarrier.getId();
 
                // find the checkpoint barrier in the queue of bending barriers
                CheckpointBarrierCount cbc = null;
                int pos = 0;
-               
+
                for (CheckpointBarrierCount next : pendingCheckpoints) {
                        if (next.checkpointId == barrierId) {
                                cbc = next;
@@ -128,21 +139,21 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
                        }
                        pos++;
                }
-               
+
                if (cbc != null) {
                        // add one to the count to that barrier and check for 
completion
                        int numBarriersNew = cbc.incrementBarrierCount();
                        if (numBarriersNew == totalNumberOfInputChannels) {
-                               // checkpoint can be triggered
+                               // checkpoint can be triggered (or is aborted 
and all barriers have been seen)
                                // first, remove this checkpoint and all all 
prior pending
                                // checkpoints (which are now subsumed)
                                for (int i = 0; i <= pos; i++) {
                                        pendingCheckpoints.pollFirst();
                                }
-                               
+
                                // notify the listener
-                               if (checkpointHandler != null) {
-                                       
checkpointHandler.onEvent(receivedBarrier);
+                               if (!cbc.isAborted()) {
+                                       
notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp());
                                }
                        }
                }
@@ -163,45 +174,104 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
                }
        }
 
+       private void processCheckpointAbortBarrier(CancelCheckpointMarker 
barrier) throws Exception {
+               final long checkpointId = barrier.getCheckpointId();
+
+               // fast path for single channel trackers
+               if (totalNumberOfInputChannels == 1) {
+                       notifyAbort(checkpointId);
+                       return;
+               }
+
+               // -- general path for multiple input channels --
+
+               // find the checkpoint barrier in the queue of pending barriers
+               // while doing this we "abort" all checkpoints before that one
+               CheckpointBarrierCount cbc;
+               while ((cbc = pendingCheckpoints.peekFirst()) != null && 
cbc.checkpointId() < checkpointId) {
+                       pendingCheckpoints.removeFirst();
+               }
+
+               if (cbc != null && cbc.checkpointId() == checkpointId) {
+                       // make sure the checkpoint is remembered as aborted
+                       if (cbc.markAborted()) {
+                               // this was the first time the checkpoint was 
aborted - notify
+                               notifyAbort(checkpointId);
+                       }
+
+                       // we still count the barriers to be able to remove the 
entry once all barriers have been seen
+                       if (cbc.incrementBarrierCount() == 
totalNumberOfInputChannels) {
+                               // we can remove this entry
+                               pendingCheckpoints.removeFirst();
+                       }
+               }
+               else {
+                       notifyAbort(checkpointId);
+
+                       // first barrier for this checkpoint - remember it as 
aborted
+                       // since we polled away all entries with lower 
checkpoint IDs
+                       // this entry will become the new first entry
+                       if (pendingCheckpoints.size() < 
MAX_CHECKPOINTS_TO_TRACK) {
+                               CheckpointBarrierCount abortedMarker = new 
CheckpointBarrierCount(checkpointId);
+                               abortedMarker.markAborted();
+                               pendingCheckpoints.addFirst(abortedMarker);
+                       }
+               }
+       }
+
+       private void notifyCheckpoint(long checkpointId, long timestamp) throws 
Exception {
+               if (toNotifyOnCheckpoint != null) {
+                       
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointId, timestamp);
+               }
+       }
+
+       private void notifyAbort(long checkpointId) throws Exception {
+               if (toNotifyOnCheckpoint != null) {
+                       
toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+               }
+       }
+
        // 
------------------------------------------------------------------------
 
        /**
         * Simple class for a checkpoint ID with a barrier counter.
         */
        private static final class CheckpointBarrierCount {
-               
+
                private final long checkpointId;
-               
+
                private int barrierCount;
-               
-               private CheckpointBarrierCount(long checkpointId) {
+
+               private boolean aborted;
+
+               CheckpointBarrierCount(long checkpointId) {
                        this.checkpointId = checkpointId;
                        this.barrierCount = 1;
                }
 
+               public long checkpointId() {
+                       return checkpointId;
+               }
+
                public int incrementBarrierCount() {
                        return ++barrierCount;
                }
-               
-               @Override
-               public int hashCode() {
-                       return (int) ((checkpointId >>> 32) ^ checkpointId) + 
17 * barrierCount; 
+
+               public boolean isAborted() {
+                       return aborted;
                }
 
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj instanceof  CheckpointBarrierCount) {
-                               CheckpointBarrierCount that = 
(CheckpointBarrierCount) obj;
-                               return this.checkpointId == that.checkpointId 
&& this.barrierCount == that.barrierCount;
-                       }
-                       else {
-                               return false;
-                       }
+               public boolean markAborted() {
+                       boolean firstAbort = !this.aborted;
+                       this.aborted = true;
+                       return firstAbort;
                }
 
                @Override
                public String toString() {
-                       return String.format("checkpointID=%d, count=%d", 
checkpointId, barrierCount);
+                       return isAborted() ?
+                               String.format("checkpointID=%d - ABORTED", 
checkpointId) :
+                               String.format("checkpointID=%d, count=%d", 
checkpointId, barrierCount);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 1b38a56..dc8d245 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -134,7 +134,7 @@ public class BufferSpiller {
                        else {
                                contents = 
EventSerializer.toSerializedEvent(boe.getEvent());
                        }
-                       
+
                        headBuffer.clear();
                        headBuffer.putInt(boe.getChannelIndex());
                        headBuffer.putInt(contents.remaining());

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 5aa2030..ca23491 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -20,8 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
 import java.io.IOException;
 
@@ -43,14 +42,14 @@ public interface CheckpointBarrierHandler {
         * @throws java.lang.InterruptedException Thrown if the thread is 
interrupted while blocking during
         *                                        waiting for the next 
BufferOrEvent to become available.
         */
-       BufferOrEvent getNextNonBlocked() throws IOException, 
InterruptedException;
+       BufferOrEvent getNextNonBlocked() throws Exception;
 
        /**
-        * Registers the given event handler to be notified on successful 
checkpoints.
-        * 
-        * @param checkpointHandler The handler to register.
+        * Registers the task be notified once all checkpoint barriers have 
been received for a checkpoint.
+        *
+        * @param task The task to notify
         */
-       void registerCheckpointEventHandler(EventListener<CheckpointBarrier> 
checkpointHandler);
+       void registerCheckpointEventHandler(StatefulTask<?> task);
 
        /**
         * Cleans up all internally held resources.
@@ -64,4 +63,13 @@ public interface CheckpointBarrierHandler {
         * @return {@code True}, if no data is buffered internally, {@code 
false} otherwise.
         */
        boolean isEmpty();
+
+       /**
+        * Gets the time that the latest alignment took, in nanoseconds.
+        * If there is currently an alignment in progress, it will return the 
time spent in the
+        * current alignment so far.
+        *
+        * @return The duration in nanoseconds
+        */
+       long getAlignmentDurationNanos();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index d11990e..7d9e4d2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -24,6 +24,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -37,7 +38,6 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -45,7 +45,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 /**
  * Input reader for {@link 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
@@ -85,7 +84,7 @@ public class StreamInputProcessor<IN> {
 
        @SuppressWarnings("unchecked")
        public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> 
inputSerializer,
-                                                               
EventListener<CheckpointBarrier> checkpointListener,
+                                                               StatefulTask<?> 
checkpointListener,
                                                                
CheckpointingMode checkpointMode,
                                                                IOManager 
ioManager,
                                                                boolean 
enableWatermarkMultiplexing) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index ce764b7..a3ae077 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -34,7 +35,6 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -95,7 +94,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                        Collection<InputGate> inputGates2,
                        TypeSerializer<IN1> inputSerializer1,
                        TypeSerializer<IN2> inputSerializer2,
-                       EventListener<CheckpointBarrier> checkpointListener,
+                       StatefulTask<?> checkpointListener,
                        CheckpointingMode checkpointMode,
                        IOManager ioManager,
                        boolean enableWatermarkMultiplexing) throws IOException 
{

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 938d8c1..d18ca16 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -43,7 +43,7 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                if (numberOfInputs > 0) {
                        InputGate[] inputGates = 
getEnvironment().getAllInputGates();
                        inputProcessor = new 
StreamInputProcessor<IN>(inputGates, inSerializer,
-                                       getCheckpointBarrierListener(), 
+                                       this,
                                        configuration.getCheckpointMode(),
                                        getEnvironment().getIOManager(),
                                        isSerializingTimestamps());

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 0e24516..351acaa 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -126,15 +127,32 @@ public class OperatorChain<OUT> {
                }
                
        }
-       
-       
-       public void broadcastCheckpointBarrier(long id, long timestamp) throws 
IOException, InterruptedException {
-               CheckpointBarrier barrier = new CheckpointBarrier(id, 
timestamp);
-               for (RecordWriterOutput<?> streamOutput : streamOutputs) {
-                       streamOutput.broadcastEvent(barrier);
+
+
+       public void broadcastCheckpointBarrier(long id, long timestamp) throws 
IOException {
+               try {
+                       CheckpointBarrier barrier = new CheckpointBarrier(id, 
timestamp);
+                       for (RecordWriterOutput<?> streamOutput : 
streamOutputs) {
+                               streamOutput.broadcastEvent(barrier);
+                       }
+               }
+               catch (InterruptedException e) {
+                       throw new IOException("Interrupted while broadcasting 
checkpoint barrier");
                }
        }
-       
+
+       public void broadcastCheckpointCancelMarker(long id) throws IOException 
{
+               try {
+                       CancelCheckpointMarker barrier = new 
CancelCheckpointMarker(id);
+                       for (RecordWriterOutput<?> streamOutput : 
streamOutputs) {
+                               streamOutput.broadcastEvent(barrier);
+                       }
+               }
+               catch (InterruptedException e) {
+                       throw new IOException("Interrupted while broadcasting 
checkpoint cancellation");
+               }
+       }
+
        public RecordWriterOutput<?>[] getStreamOutputs() {
                return streamOutputs;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8f28cef..d55a9c5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -37,7 +36,6 @@ import 
org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
@@ -580,9 +578,34 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                }
        }
 
-       protected boolean performCheckpoint(final long checkpointId, final long 
timestamp) throws Exception {
+       @Override
+       public void triggerCheckpointOnBarrier(long checkpointId, long 
timestamp) throws Exception {
+               try {
+                       performCheckpoint(checkpointId, timestamp);
+               }
+               catch (CancelTaskException e) {
+                       throw e;
+               }
+               catch (Exception e) {
+                       throw new Exception("Error while performing a 
checkpoint", e);
+               }
+       }
+
+       @Override
+       public void abortCheckpointOnBarrier(long checkpointId) throws 
Exception {
+               LOG.debug("Aborting checkpoint via cancel-barrier {} for task 
{}", checkpointId, getName());
+
+               synchronized (lock) {
+                       if (isRunning) {
+                               
operatorChain.broadcastCheckpointCancelMarker(checkpointId);
+                       }
+               }
+       }
+
+       private boolean performCheckpoint(final long checkpointId, final long 
timestamp) throws Exception {
+
                LOG.debug("Starting checkpoint {} on task {}", checkpointId, 
getName());
-               
+
                synchronized (lock) {
                        if (isRunning) {
 
@@ -759,23 +782,6 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                return getName();
        }
 
-       protected final EventListener<CheckpointBarrier> 
getCheckpointBarrierListener() {
-               return new EventListener<CheckpointBarrier>() {
-                       @Override
-                       public void onEvent(CheckpointBarrier barrier) {
-                               try {
-                                       performCheckpoint(barrier.getId(), 
barrier.getTimestamp());
-                               }
-                               catch (CancelTaskException e) {
-                                       throw e;
-                               }
-                               catch (Exception e) {
-                                       throw new RuntimeException("Error 
triggering a checkpoint as the result of receiving checkpoint barrier", e);
-                               }
-                       }
-               };
-       }
-
        // 
------------------------------------------------------------------------
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index c3305eb..9252063 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -68,7 +68,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
        
                this.inputProcessor = new StreamTwoInputProcessor<IN1, 
IN2>(inputList1, inputList2,
                                inputDeserializer1, inputDeserializer2,
-                               getCheckpointBarrierListener(),
+                               this,
                                configuration.getCheckpointMode(),
                                getEnvironment().getIOManager(),
                                isSerializingTimestamps());

Reply via email to