[FLINK-4985] [checkpointing] Report canceled / declined checkpoints to the 
Checkpoint Coordinator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a4fdfff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a4fdfff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a4fdfff

Branch: refs/heads/release-1.1
Commit: 1a4fdfff5d364a35e935604c0a5058a1a9f242f7
Parents: a1f028d
Author: Stephan Ewen <[email protected]>
Authored: Tue Nov 8 17:13:19 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Nov 8 19:07:16 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  18 +-
 .../AlignmentLimitExceededException.java        |  33 +++
 .../decline/CheckpointDeclineException.java     |  35 +++
 ...ntDeclineOnCancellationBarrierException.java |  32 +++
 .../CheckpointDeclineSubsumedException.java     |  32 +++
 ...intDeclineTaskNotCheckpointingException.java |  32 +++
 .../CheckpointDeclineTaskNotReadyException.java |  32 +++
 .../decline/InputEndOfStreamException.java      |  32 +++
 .../flink/runtime/execution/Environment.java    |   9 +
 .../runtime/jobgraph/tasks/StatefulTask.java    |   3 +-
 .../messages/checkpoint/DeclineCheckpoint.java  |  65 +++---
 .../runtime/taskmanager/RuntimeEnvironment.java |   7 +
 .../apache/flink/runtime/taskmanager/Task.java  |  22 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |  41 +---
 .../savepoint/SavepointCoordinatorTest.java     |   2 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |   2 +-
 .../operators/testutils/DummyEnvironment.java   |   5 +
 .../operators/testutils/MockEnvironment.java    |   5 +
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   2 +-
 .../streaming/runtime/io/BarrierBuffer.java     |  28 ++-
 .../streaming/runtime/io/BarrierTracker.java    |   4 +-
 .../streaming/runtime/tasks/StreamTask.java     |  25 ++-
 .../streaming/runtime/io/BarrierBufferTest.java |  31 +--
 .../runtime/io/BarrierTrackerTest.java          |   2 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |   2 +
 .../runtime/tasks/StreamMockEnvironment.java    |   3 +
 .../StreamTaskCancellationBarrierTest.java      | 221 +++++++++++++++++++
 .../runtime/tasks/StreamTaskTestHarness.java    |  31 ++-
 .../runtime/tasks/TwoInputStreamTaskTest.java   |   2 +
 29 files changed, 632 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 409f05b..8661ddc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -575,6 +575,7 @@ public class CheckpointCoordinator {
                }
 
                final long checkpointId = message.getCheckpointId();
+               final String reason = (message.getReason() != null ? 
message.getReason().getMessage() : "");
 
                PendingCheckpoint checkpoint;
 
@@ -594,8 +595,8 @@ public class CheckpointCoordinator {
                        if (checkpoint != null && !checkpoint.isDiscarded()) {
                                isPendingCheckpoint = true;
 
-                               LOG.info("Discarding checkpoint " + checkpointId
-                                       + " because of checkpoint decline from 
task " + message.getTaskExecutionId());
+                               LOG.info("Discarding checkpoint {} because of 
checkpoint decline from task {} : {}",
+                                               checkpointId, 
message.getTaskExecutionId(), reason);
 
                                pendingCheckpoints.remove(checkpointId);
                                checkpoint.discard(userClassLoader);
@@ -604,19 +605,14 @@ public class CheckpointCoordinator {
                                onCancelCheckpoint(checkpointId);
 
                                boolean haveMoreRecentPending = false;
-                               Iterator<Map.Entry<Long, PendingCheckpoint>> 
entries = pendingCheckpoints.entrySet().iterator();
-                               while (entries.hasNext()) {
-                                       PendingCheckpoint p = 
entries.next().getValue();
-                                       if (!p.isDiscarded() && 
p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
+                               for (PendingCheckpoint p : 
pendingCheckpoints.values()) {
+                                       if (!p.isDiscarded() && 
p.getCheckpointId() >= checkpoint.getCheckpointId()) {
                                                haveMoreRecentPending = true;
                                                break;
                                        }
                                }
-                               if (!haveMoreRecentPending && 
!triggerRequestQueued) {
-                                       LOG.info("Triggering new checkpoint 
because of discarded checkpoint " + checkpointId);
-                                       
triggerCheckpoint(System.currentTimeMillis());
-                               } else if (!haveMoreRecentPending) {
-                                       LOG.info("Promoting queued checkpoint 
request because of discarded checkpoint " + checkpointId);
+
+                               if (!haveMoreRecentPending) {
                                        triggerQueuedRequests();
                                }
                        } else if (checkpoint != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
new file mode 100644
index 0000000..64d57bc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because too many bytes 
were
+ * buffered in the alignment phase.
+ */
+public final class AlignmentLimitExceededException extends 
CheckpointDeclineException {
+
+       private static final long serialVersionUID = 1L;
+
+       public AlignmentLimitExceededException(long numBytes) {
+               super("The checkpoint alignment phase needed to buffer more 
than the configured maximum ("
+                               + numBytes + " bytes).");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
new file mode 100644
index 0000000..8a2802c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Base class of all exceptions that indicate a declined checkpoint.
+ */
+public abstract class CheckpointDeclineException extends Exception {
+
+       private static final long serialVersionUID = 1L;
+
+       public CheckpointDeclineException(String message) {
+               super(message);
+       }
+
+       public CheckpointDeclineException(String message, Throwable cause) {
+               super(message, cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
new file mode 100644
index 0000000..9ae4096
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a cancellation
+ * barrier was received.
+ */
+public final class CheckpointDeclineOnCancellationBarrierException extends 
CheckpointDeclineException {
+
+       private static final long serialVersionUID = 1L;
+
+       public CheckpointDeclineOnCancellationBarrierException() {
+               super("Task received cancellation from one of its inputs");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
new file mode 100644
index 0000000..5380469
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a newer 
checkpoint
+ * barrier was received on an input before the pending checkpoint's barrier. 
+ */
+public final class CheckpointDeclineSubsumedException extends 
CheckpointDeclineException {
+
+       private static final long serialVersionUID = 1L;
+
+       public CheckpointDeclineSubsumedException(long newCheckpointId) {
+               super("Checkpoint was canceled because a barrier from newer 
checkpoint " + newCheckpointId + " was received.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
new file mode 100644
index 0000000..e5773d1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a task does not 
support
+ * checkpointing.
+ */
+public final class CheckpointDeclineTaskNotCheckpointingException extends 
CheckpointDeclineException {
+
+       private static final long serialVersionUID = 1L;
+
+       public CheckpointDeclineTaskNotCheckpointingException(String taskName) {
+               super("Task '" + taskName + "'does not support checkpointing");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
new file mode 100644
index 0000000..a1214fe
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a task was not
+ * ready to perform a checkpoint.
+ */
+public final class CheckpointDeclineTaskNotReadyException extends 
CheckpointDeclineException {
+
+       private static final long serialVersionUID = 1L;
+
+       public CheckpointDeclineTaskNotReadyException(String taskName) {
+               super("Task " + taskName + " was not running");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
new file mode 100644
index 0000000..86b29dc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because one of the input
+ * stream reached its end before the alignment was complete.
+ */
+public final class InputEndOfStreamException extends 
CheckpointDeclineException {
+
+       private static final long serialVersionUID = 1L;
+
+       public InputEndOfStreamException() {
+               super("Checkpoint was declined because one input stream is 
finished");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 5ad5fe2..e84a5e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -167,6 +167,15 @@ public interface Environment {
        void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state);
 
        /**
+        * Declines a checkpoint. This tells the checkpoint coordinator that 
this task will
+        * not be able to successfully complete a certain checkpoint.
+        * 
+        * @param checkpointId The ID of the declined checkpoint.
+        * @param cause An optional reason why the checkpoint was declined.
+        */
+       void declineCheckpoint(long checkpointId, Throwable cause);
+       
+       /**
         * Marks task execution failed for an external reason (a reason other 
than the task code itself
         * throwing an exception). If the task is already in a terminal state
         * (such as FINISHED, CANCELED, FAILED), or if the task is already 
canceling this does nothing.

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 7c581df..874ca4a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -66,8 +66,9 @@ public interface StatefulTask<T extends StateHandle<?>> {
         * {@link 
org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their 
outputs.
         * 
         * @param checkpointId The ID of the checkpoint to be aborted.
+        * @param cause The reason why the checkpoint was aborted during 
alignment   
         */
-       void abortCheckpointOnBarrier(long checkpointId) throws Exception;
+       void abortCheckpointOnBarrier(long checkpointId, Throwable cause) 
throws Exception;
 
        /**
         * Invoked when a checkpoint has been completed, i.e., when the 
checkpoint coordinator has received

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
index f26d2fb..dca212c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
@@ -19,7 +19,14 @@
 package org.apache.flink.runtime.messages.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
+import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.util.SerializedThrowable;
 
 /**
  * This message is sent from the {@link 
org.apache.flink.runtime.taskmanager.TaskManager} to the
@@ -31,44 +38,48 @@ public class DeclineCheckpoint extends 
AbstractCheckpointMessage implements java
 
        private static final long serialVersionUID = 2094094662279578953L;
 
-       /** The timestamp associated with the checkpoint */
-       private final long timestamp;
+       /** The reason why the checkpoint was declined */
+       private final Throwable reason;
 
-       public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, 
long checkpointId, long timestamp) {
-               super(job, taskExecutionId, checkpointId);
-               this.timestamp = timestamp;
+       public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, 
long checkpointId) {
+               this(job, taskExecutionId, checkpointId, null);
        }
 
-       // 
--------------------------------------------------------------------------------------------
+       public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, 
long checkpointId, Throwable reason) {
+               super(job, taskExecutionId, checkpointId);
 
-       public long getTimestamp() {
-               return timestamp;
+               if (reason == null ||
+                               reason.getClass() == 
AlignmentLimitExceededException.class ||
+                               reason.getClass() == 
CheckpointDeclineOnCancellationBarrierException.class ||
+                               reason.getClass() == 
CheckpointDeclineSubsumedException.class ||
+                               reason.getClass() == 
CheckpointDeclineTaskNotCheckpointingException.class ||
+                               reason.getClass() == 
CheckpointDeclineTaskNotReadyException.class ||
+                               reason.getClass() == 
InputEndOfStreamException.class)
+               {
+                       // null or known common exceptions that cannot 
reference any dynamically loaded code
+                       this.reason = reason;
+               } else {
+                       // some other exception. replace with a serialized 
throwable, to be on the safe side
+                       this.reason = new SerializedThrowable(reason);
+               }
        }
 
        // 
--------------------------------------------------------------------------------------------
 
-       @Override
-       public int hashCode() {
-               return super.hashCode() + (int) (timestamp ^ (timestamp >>> 
32));
+       /**
+        * Gets the reason why the checkpoint was declined.
+        *
+        * @return The reason why the checkpoint was declined
+        */
+       public Throwable getReason() {
+               return reason;
        }
 
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               else if (o instanceof DeclineCheckpoint) {
-                       DeclineCheckpoint that = (DeclineCheckpoint) o;
-                       return this.timestamp == that.timestamp && 
super.equals(o);
-               }
-               else {
-                       return false;
-               }
-       }
+       // 
--------------------------------------------------------------------------------------------
 
        @Override
        public String toString() {
-               return String.format("Declined Checkpoint %d@%d for (%s/%s)",
-                               getCheckpointId(), getTimestamp(), getJob(), 
getTaskExecutionId());
+               return String.format("Declined Checkpoint %d for (%s/%s): %s",
+                               getCheckpointId(), getJob(), 
getTaskExecutionId(), reason);
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 6fdf6f9..47149a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -268,6 +269,12 @@ public class RuntimeEnvironment implements Environment {
        }
 
        @Override
+       public void declineCheckpoint(long checkpointId, Throwable cause) {
+               DeclineCheckpoint message = new DeclineCheckpoint(jobId, 
executionId, checkpointId, cause);
+               jobManager.tell(message);
+       }
+
+       @Override
        public void failExternally(Throwable cause) {
                this.containingTask.failExternally(cause);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index ed15dbf..3eab7e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -29,6 +29,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -969,13 +971,16 @@ public class Task implements Runnable {
                                                try {
                                                        boolean success = 
statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
                                                        if (!success) {
-                                                               
DeclineCheckpoint decline = new DeclineCheckpoint(jobId, getExecutionId(), 
checkpointID, checkpointTimestamp);
+                                                               // task was not 
ready to trigger this checkpoint
+                                                               
DeclineCheckpoint decline = new DeclineCheckpoint(
+                                                                               
jobId, getExecutionId(), checkpointID,
+                                                                               
new CheckpointDeclineTaskNotReadyException(taskName));
                                                                
jobManager.tell(decline);
                                                        }
                                                }
                                                catch (Throwable t) {
                                                        if (getExecutionState() 
== ExecutionState.RUNNING) {
-                                                               
failExternally(new RuntimeException(
+                                                               
failExternally(new Exception(
                                                                        "Error 
while triggering checkpoint for " + taskName,
                                                                        t));
                                                        }
@@ -987,10 +992,21 @@ public class Task implements Runnable {
                        else {
                                LOG.error("Task received a checkpoint request, 
but is not a checkpointing task - "
                                                + taskNameWithSubtask);
+
+                               DeclineCheckpoint decline = new 
DeclineCheckpoint(
+                                               jobId, executionId, 
checkpointID,
+                                               new 
CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask));
+                               jobManager.tell(decline);
                        }
                }
                else {
-                       LOG.debug("Ignoring request to trigger a checkpoint for 
non-running task.");
+                       LOG.debug("Declining checkpoint request for non-running 
task");
+
+                       // send back a message that we did not do the checkpoint
+                       DeclineCheckpoint decline = new DeclineCheckpoint(
+                                       jobId, executionId, checkpointID,
+                                       new 
CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
+                       jobManager.tell(decline);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 62af42b..f3f988a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -288,44 +288,19 @@ public class CheckpointCoordinatorTest {
 
                        // decline checkpoint from the other task, this should 
cancel the checkpoint
                        // and trigger a new one
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId));
                        assertTrue(checkpoint.isDiscarded());
 
-                       // validate that we have a new pending checkpoint
-                       assertEquals(1, coord.getNumberOfPendingCheckpoints());
+                       // validate that we have no new pending checkpoint
+                       assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
-                       long checkpointIdNew = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-                       PendingCheckpoint checkpointNew = 
coord.getPendingCheckpoints().get(checkpointIdNew);
-
-                       assertNotNull(checkpointNew);
-                       assertEquals(checkpointIdNew, 
checkpointNew.getCheckpointId());
-                       assertEquals(jid, checkpointNew.getJobId());
-                       assertEquals(2, 
checkpointNew.getNumberOfNonAcknowledgedTasks());
-                       assertEquals(0, 
checkpointNew.getNumberOfAcknowledgedTasks());
-                       assertEquals(0, checkpointNew.getTaskStates().size());
-                       assertFalse(checkpointNew.isDiscarded());
-                       assertFalse(checkpointNew.isFullyAcknowledged());
-                       assertNotEquals(checkpoint.getCheckpointId(), 
checkpointNew.getCheckpointId());
-
-                       // check that the vertices received the new trigger 
checkpoint message
-                       {
-                               TriggerCheckpoint expectedMessage1 = new 
TriggerCheckpoint(jid, attemptID1, checkpointIdNew, 
checkpointNew.getCheckpointTimestamp());
-                               TriggerCheckpoint expectedMessage2 = new 
TriggerCheckpoint(jid, attemptID2, checkpointIdNew, 
checkpointNew.getCheckpointTimestamp());
-                               verify(vertex1, 
times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
-                               verify(vertex2, 
times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
-                       }
-
                        // decline again, nothing should happen
                        // decline from the other task, nothing should happen
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID2, checkpointId, checkpoint.getCheckpointTimestamp()));
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId));
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID2, checkpointId));
                        assertTrue(checkpoint.isDiscarded());
 
-                       // should still have the same second checkpoint pending
-                       long checkpointIdNew2 = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-                       assertEquals(checkpointIdNew2, checkpointIdNew);
-
                        coord.shutdown();
                }
                catch (Exception e) {
@@ -422,7 +397,7 @@ public class CheckpointCoordinatorTest {
 
                        // decline checkpoint from one of the tasks, this 
should cancel the checkpoint
                        // and trigger a new one
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint1Id));
                        assertTrue(checkpoint1.isDiscarded());
 
                        // validate that we have only one pending checkpoint 
left
@@ -446,8 +421,8 @@ public class CheckpointCoordinatorTest {
 
                        // decline again, nothing should happen
                        // decline from the other task, nothing should happen
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID2, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint1Id));
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID2, checkpoint1Id));
                        assertTrue(checkpoint1.isDiscarded());
 
                        coord.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
index b1b384d..dc2b23f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
@@ -202,7 +202,7 @@ public class SavepointCoordinatorTest extends TestLogger {
 
                coordinator.receiveDeclineMessage(new DeclineCheckpoint(
                                jobId, 
vertices[1].getCurrentExecutionAttempt().getAttemptId(),
-                               checkpointId, 0));
+                               checkpointId));
 
 
                // The pending checkpoint is completed

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 4dfaf95..2e3bace 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -455,7 +455,7 @@ public class JobManagerHARecoveryTest {
                }
 
                @Override
-               public void abortCheckpointOnBarrier(long checkpointId) {
+               public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) {
                        throw new UnsupportedOperationException("should not be 
called!");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 5af34fb..ca68683 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -140,6 +140,11 @@ public class DummyEnvironment implements Environment {
        public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> 
state) {}
 
        @Override
+       public void declineCheckpoint(long checkpointId, Throwable cause) {
+               throw new UnsupportedOperationException();
+       }
+       
+       @Override
        public void failExternally(Throwable cause) {
                throw new UnsupportedOperationException("DummyEnvironment does 
not support external task failure.");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 9dea324..22dee63 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -291,6 +291,11 @@ public class MockEnvironment implements Environment {
        }
 
        @Override
+       public void declineCheckpoint(long checkpointId, Throwable cause) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
        public void failExternally(Throwable cause) {
                throw new UnsupportedOperationException("MockEnvironment does 
not support external task failure.");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 5b344eb..3ace0f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -226,7 +226,7 @@ public class TaskAsyncCallTest {
                }
 
                @Override
-               public void abortCheckpointOnBarrier(long checkpointId) {
+               public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) {
                        throw new UnsupportedOperationException("Should not be 
called");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 36de717..7a8e7d3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -18,6 +18,10 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -142,7 +146,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                }
                                else {
                                        if (next.getEvent().getClass() == 
EndOfPartitionEvent.class) {
-                                               
processEndOfPartition(next.getChannelIndex());
+                                               processEndOfPartition();
                                        }
                                        return next;
                                }
@@ -196,7 +200,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                                "Skipping current checkpoint.", 
barrierId, currentCheckpointId);
 
                                // let the task know we are not completing this
-                               notifyAbort(currentCheckpointId);
+                               notifyAbort(currentCheckpointId, new 
CheckpointDeclineSubsumedException(barrierId));
 
                                // abort the current checkpoint
                                releaseBlocksAndResetBarriers();
@@ -241,7 +245,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        if (barrierId > currentCheckpointId) {
                                // new checkpoint
                                currentCheckpointId = barrierId;
-                               notifyAbort(barrierId);
+                               notifyAbortOnCancellationBarrier(barrierId);
                        }
                        return;
                }
@@ -258,7 +262,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                }
 
                                releaseBlocksAndResetBarriers();
-                               notifyAbort(barrierId);
+                               notifyAbortOnCancellationBarrier(barrierId);
                        }
                        else if (barrierId > currentCheckpointId) {
                                // we canceled the next which also cancels the 
current
@@ -272,7 +276,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                currentCheckpointId = barrierId;
                                startOfAlignmentTimestamp = 0L;
                                latestAlignmentDurationNanos = 0L;
-                               notifyAbort(barrierId);
+                               notifyAbortOnCancellationBarrier(barrierId);
                        }
 
                        // else: ignore trailing (cancellation) barrier from an 
earlier checkpoint (obsolete now)
@@ -292,7 +296,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                LOG.debug("Checkpoint {} canceled, skipping 
alignment", barrierId);
                        }
 
-                       notifyAbort(barrierId);
+                       notifyAbortOnCancellationBarrier(barrierId);
                }
 
                // else: trailing barrier from either
@@ -300,12 +304,12 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                //   - the current checkpoint if it was already canceled
        }
 
-       private void processEndOfPartition(int channel) throws Exception {
+       private void processEndOfPartition() throws Exception {
                numClosedChannels++;
 
                if (numBarriersReceived > 0) {
                        // let the task know we skip a checkpoint
-                       notifyAbort(currentCheckpointId);
+                       notifyAbort(currentCheckpointId, new 
InputEndOfStreamException());
 
                        // no chance to complete this checkpoint
                        releaseBlocksAndResetBarriers();
@@ -319,9 +323,13 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                }
        }
 
-       private void notifyAbort(long checkpointId) throws Exception {
+       private void notifyAbortOnCancellationBarrier(long checkpointId) throws 
Exception {
+               notifyAbort(checkpointId, new 
CheckpointDeclineOnCancellationBarrierException());
+       }
+
+       private void notifyAbort(long checkpointId, CheckpointDeclineException 
cause) throws Exception {
                if (toNotifyOnCheckpoint != null) {
-                       
toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+                       
toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 5157336..8b4cc48 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
@@ -227,7 +228,8 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
 
        private void notifyAbort(long checkpointId) throws Exception {
                if (toNotifyOnCheckpoint != null) {
-                       
toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+                       toNotifyOnCheckpoint.abortCheckpointOnBarrier(
+                                       checkpointId, new 
CheckpointDeclineOnCancellationBarrierException());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d55a9c5..4f0839f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,6 +25,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -592,13 +594,15 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
        }
 
        @Override
-       public void abortCheckpointOnBarrier(long checkpointId) throws 
Exception {
+       public void abortCheckpointOnBarrier(long checkpointId, Throwable 
cause) throws Exception {
                LOG.debug("Aborting checkpoint via cancel-barrier {} for task 
{}", checkpointId, getName());
 
+               // notify the coordinator that we decline this checkpoint
+               getEnvironment().declineCheckpoint(checkpointId, cause);
+
+               // notify all downstream operators that they should not wait 
for a barrier from us
                synchronized (lock) {
-                       if (isRunning) {
-                               
operatorChain.broadcastCheckpointCancelMarker(checkpointId);
-                       }
+                       
operatorChain.broadcastCheckpointCancelMarker(checkpointId);
                }
        }
 
@@ -669,7 +673,18 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                        checkpointThread.start();
                                }
                                return true;
-                       } else {
+                       }
+                       else {
+                               // we cannot perform our checkpoint - let the 
downstream operators know that they
+                               // should not wait for any input from this 
operator
+
+                               // we cannot broadcast the cancellation markers 
on the 'operator chain', because it may not
+                               // yet be created
+                               final CancelCheckpointMarker message = new 
CancelCheckpointMarker(checkpointId);
+                               for (ResultPartitionWriter output : 
getEnvironment().getAllWriters()) {
+                                       output.writeEventToAllChannels(message);
+                               }
+
                                return false;
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index cf1f98e..20cb8f7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -45,6 +47,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
@@ -566,7 +569,7 @@ public class BarrierBufferTest {
                        check(sequence[12], buffer.getNextNonBlocked());
                        assertEquals(3L, buffer.getCurrentCheckpointId());
                        validateAlignmentTime(startTs, buffer);
-                       verify(toNotify).abortCheckpointOnBarrier(2L);
+                       verify(toNotify).abortCheckpointOnBarrier(eq(2L), 
any(CheckpointDeclineSubsumedException.class));
                        check(sequence[16], buffer.getNextNonBlocked());
 
                        // checkpoint 3 alignment in progress
@@ -574,7 +577,7 @@ public class BarrierBufferTest {
 
                        // checkpoint 3 aborted (end of partition)
                        check(sequence[20], buffer.getNextNonBlocked());
-                       verify(toNotify).abortCheckpointOnBarrier(3L);
+                       verify(toNotify).abortCheckpointOnBarrier(eq(3L), 
any(CheckpointDeclineSubsumedException.class));
 
                        // replay buffered data from checkpoint 3
                        check(sequence[18], buffer.getNextNonBlocked());
@@ -999,13 +1002,13 @@ public class BarrierBufferTest {
                check(sequence[6], buffer.getNextNonBlocked());
                assertEquals(5L, buffer.getCurrentCheckpointId());
                verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(2L), 
anyLong());
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
                verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), 
anyLong());
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                check(sequence[8], buffer.getNextNonBlocked());
                assertEquals(6L, buffer.getCurrentCheckpointId());
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
                
                buffer.cleanup();
@@ -1073,7 +1076,7 @@ public class BarrierBufferTest {
                // canceled checkpoint on last barrier
                startTs = System.nanoTime();
                check(sequence[12], buffer.getNextNonBlocked());
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
                validateAlignmentTime(startTs, buffer);
                check(sequence[13], buffer.getNextNonBlocked());
 
@@ -1088,7 +1091,7 @@ public class BarrierBufferTest {
 
                // this checkpoint gets immediately canceled
                check(sequence[24], buffer.getNextNonBlocked());
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                // some buffers
@@ -1104,7 +1107,7 @@ public class BarrierBufferTest {
                check(sequence[33], buffer.getNextNonBlocked());
 
                check(sequence[37], buffer.getNextNonBlocked());
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                // all done
@@ -1167,7 +1170,7 @@ public class BarrierBufferTest {
 
                // re-read the queued cancellation barriers
                check(sequence[9], buffer.getNextNonBlocked());
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                check(sequence[10], buffer.getNextNonBlocked());
@@ -1184,7 +1187,7 @@ public class BarrierBufferTest {
 
                // no further checkpoint (abort) notifications
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), 
any(CheckpointDeclineOnCancellationBarrierException.class));
 
                // all done
                assertNull(buffer.getNextNonBlocked());
@@ -1253,7 +1256,7 @@ public class BarrierBufferTest {
                // cancelled by cancellation barrier
                check(sequence[4], buffer.getNextNonBlocked());
                validateAlignmentTime(startTs, buffer);
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(1L);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(1L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
 
                // the next checkpoint alignment starts now
                startTs = System.nanoTime();
@@ -1285,7 +1288,7 @@ public class BarrierBufferTest {
 
                // check overall notifications
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), 
any(Throwable.class));
        }
 
        /**
@@ -1337,7 +1340,7 @@ public class BarrierBufferTest {
                // future barrier aborts checkpoint
                startTs = System.nanoTime();
                check(sequence[3], buffer.getNextNonBlocked());
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(3L);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), 
any(CheckpointDeclineSubsumedException.class));
                check(sequence[4], buffer.getNextNonBlocked());
 
                // alignment of next checkpoint
@@ -1366,7 +1369,7 @@ public class BarrierBufferTest {
 
                // check overall notifications
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), 
any(Throwable.class));
        }
 
        // 
------------------------------------------------------------------------
@@ -1471,7 +1474,7 @@ public class BarrierBufferTest {
                }
 
                @Override
-               public void abortCheckpointOnBarrier(long checkpointId) {}
+               public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) {}
 
                @Override
                public void notifyCheckpointComplete(long checkpointId) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 903f585..978c212 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -481,7 +481,7 @@ public class BarrierTrackerTest {
                }
 
                @Override
-               public void abortCheckpointOnBarrier(long checkpointId) {
+               public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) {
                        assertTrue("More checkpoints than expected", i < 
checkpointIDs.length);
 
                        final long expectedId = checkpointIDs[i++];

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 5fcc59e..9003f0e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -270,6 +271,7 @@ public class OneInputStreamTaskTest {
                testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
                testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
 
+               expectedOutput.add(new CancelCheckpointMarker(0));
                expectedOutput.add(new StreamRecord<String>("Hello-0-0", 
initialTime));
                expectedOutput.add(new StreamRecord<String>("Ciao-0-0", 
initialTime));
                expectedOutput.add(new CheckpointBarrier(1, 1));

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 7084208..36ad8ff 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -302,6 +302,9 @@ public class StreamMockEnvironment implements Environment {
        }
 
        @Override
+       public void declineCheckpoint(long checkpointId, Throwable cause) {}
+
+       @Override
        public void failExternally(Throwable cause) {
                throw new UnsupportedOperationException("StreamMockEnvironment 
does not support external task failure.");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
new file mode 100644
index 0000000..8b8b659
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", 
"org.apache.log4j.*"})
+public class StreamTaskCancellationBarrierTest {
+
+       /**
+        * This test checks that tasks emit a proper cancel checkpoint barrier, 
if a "trigger checkpoint" message
+        * comes before they are ready.
+        */
+       @Test
+       public void testEmitCancellationBarrierWhenNotReady() throws Exception {
+               StreamTask<String, ?> task = new InitBlockingTask();
+               StreamTaskTestHarness<String> testHarness = new 
StreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO);
+
+               // start the test - this cannot succeed across the 'init()' 
method
+               testHarness.invoke();
+
+               // tell the task to commence a checkpoint
+               boolean result = task.triggerCheckpoint(41L, 
System.currentTimeMillis());
+               assertFalse("task triggered checkpoint though not ready", 
result);
+
+               // a cancellation barrier should be downstream
+               Object emitted = testHarness.getOutput().poll();
+               assertNotNull("nothing emitted", emitted);
+               assertTrue("wrong type emitted", emitted instanceof 
CancelCheckpointMarker);
+               assertEquals("wrong checkpoint id", 41L, 
((CancelCheckpointMarker) emitted).getCheckpointId());
+       }
+
+       /**
+        * This test verifies (for onw input tasks) that the Stream tasks react 
the following way to
+        * receiving a checkpoint cancellation barrier:
+        *
+        *   - send a "decline checkpoint" notification out (to the JobManager)
+        *   - emit a cancellation barrier downstream
+        */
+       @Test
+       public void testDeclineCallOnCancelBarrierOneInput() throws Exception {
+
+               OneInputStreamTask<String, String> task = new 
OneInputStreamTask<String, String>();
+               OneInputStreamTaskTestHarness<String, String> testHarness = new 
OneInputStreamTaskTestHarness<>(
+                               task,
+                               1, 2,
+                               BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               StreamConfig streamConfig = testHarness.getStreamConfig();
+               StreamMap<String, String> mapOperator = new StreamMap<>(new 
IdentityMap());
+               streamConfig.setStreamOperator(mapOperator);
+
+               StreamMockEnvironment environment = 
spy(testHarness.createEnvironment());
+
+               // start the task
+               testHarness.invoke(environment);
+               testHarness.waitForTaskRunning();
+
+               // emit cancellation barriers
+               testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 1);
+               testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0);
+               testHarness.waitForInputProcessing();
+
+               // the decline call should go to the coordinator
+               verify(environment, times(1)).declineCheckpoint(eq(2L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
+
+               // a cancellation barrier should be downstream
+               Object result = testHarness.getOutput().poll();
+               assertNotNull("nothing emitted", result);
+               assertTrue("wrong type emitted", result instanceof 
CancelCheckpointMarker);
+               assertEquals("wrong checkpoint id", 2L, 
((CancelCheckpointMarker) result).getCheckpointId());
+
+               // cancel and shutdown
+               testHarness.endInput();
+               testHarness.waitForTaskCompletion();
+       }
+
+       /**
+        * This test verifies (for onw input tasks) that the Stream tasks react 
the following way to
+        * receiving a checkpoint cancellation barrier:
+        *
+        *   - send a "decline checkpoint" notification out (to the JobManager)
+        *   - emit a cancellation barrier downstream
+        */
+       @Test
+       public void testDeclineCallOnCancelBarrierTwoInputs() throws Exception {
+
+               TwoInputStreamTask<String, String, String> task = new 
TwoInputStreamTask<String, String, String>();
+               TwoInputStreamTaskTestHarness<String, String, String> 
testHarness = new TwoInputStreamTaskTestHarness<>(
+                               task,
+                               BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+               StreamConfig streamConfig = testHarness.getStreamConfig();
+               CoStreamMap<String, String, String> op = new CoStreamMap<>(new 
UnionCoMap());
+               streamConfig.setStreamOperator(op);
+
+               StreamMockEnvironment environment = 
spy(testHarness.createEnvironment());
+
+               // start the task
+               testHarness.invoke(environment);
+               testHarness.waitForTaskRunning();
+
+               // emit cancellation barriers
+               testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0);
+               testHarness.processEvent(new CancelCheckpointMarker(2L), 1, 0);
+               testHarness.waitForInputProcessing();
+
+               // the decline call should go to the coordinator
+               verify(environment, times(1)).declineCheckpoint(eq(2L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
+
+               // a cancellation barrier should be downstream
+               Object result = testHarness.getOutput().poll();
+               assertNotNull("nothing emitted", result);
+               assertTrue("wrong type emitted", result instanceof 
CancelCheckpointMarker);
+               assertEquals("wrong checkpoint id", 2L, 
((CancelCheckpointMarker) result).getCheckpointId());
+
+               // cancel and shutdown
+               testHarness.endInput();
+               testHarness.waitForTaskCompletion();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test tasks / functions
+       // 
------------------------------------------------------------------------
+
+       private static class InitBlockingTask extends StreamTask<String, 
AbstractStreamOperator<String>> {
+
+               private final Object lock = new Object();
+               private volatile boolean running = true;
+
+               @Override
+               protected void init() throws Exception {
+                       synchronized (lock) {
+                               while (running) {
+                                       lock.wait();
+                               }
+                       }
+               }
+
+               @Override
+               protected void run() throws Exception {}
+
+               @Override
+               protected void cleanup() throws Exception {}
+
+               @Override
+               protected void cancelTask() throws Exception {
+                       running = false;
+                       synchronized (lock) {
+                               lock.notifyAll();
+                       }
+               }
+       }
+
+       private static class IdentityMap implements MapFunction<String, String> 
{
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public String map(String value) throws Exception {
+                       return value;
+               }
+       }
+
+       private static class UnionCoMap implements CoMapFunction<String, 
String, String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public String map1(String value) throws Exception {
+                       return value;
+               }
+
+               @Override
+               public String map2(String value) throws Exception {
+                       return value;
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 00e95b9..0bd8d9a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -150,21 +150,18 @@ public class StreamTaskTestHarness<OUT> {
 
        }
 
+       public StreamMockEnvironment createEnvironment() {
+               return new StreamMockEnvironment(
+                               jobConfig, taskConfig, executionConfig, 
memorySize, new MockInputSplitProvider(), bufferSize);
+       }
+       
        /**
         * Invoke the Task. This resets the output of any previous invocation. 
This will start a new
         * Thread to execute the Task in. Use {@link #waitForTaskCompletion()} 
to wait for the
         * Task thread to finish running.
         */
        public void invoke() throws Exception {
-               mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, 
executionConfig,
-                       memorySize, new MockInputSplitProvider(), bufferSize);
-               task.setEnvironment(mockEnv);
-
-               initializeInputs();
-               initializeOutput();
-
-               taskThread = new TaskThread(task);
-               taskThread.start();
+               invoke(createEnvironment());
        }
 
        /**
@@ -205,7 +202,7 @@ public class StreamTaskTestHarness<OUT> {
                        if (taskThread.task instanceof StreamTask) {
                                StreamTask<?, ?> streamTask = (StreamTask<?, 
?>) taskThread.task;
                                while (!streamTask.isRunning()) {
-                                       Thread.sleep(100);
+                                       Thread.sleep(10);
                                        if (!taskThread.isAlive()) {
                                                if (taskThread.getError() != 
null) {
                                                        throw new 
Exception("Task Thread failed due to an error.", taskThread.getError());
@@ -282,15 +279,15 @@ public class StreamTaskTestHarness<OUT> {
        /**
         * This only returns after all input queues are empty.
         */
-       public void waitForInputProcessing() {
-
-
+       public void waitForInputProcessing() throws Exception {
                // first wait for all input queues to be empty
-               try {
-                       Thread.sleep(1);
-               } catch (InterruptedException ignored) {}
-               
+
                while (true) {
+                       Throwable error = taskThread.getError();
+                       if (error != null) {
+                               throw new Exception("Exception in the task 
thread", error);
+                       }
+
                        boolean allEmpty = true;
                        for (int i = 0; i < numInputGates; i++) {
                                if (!inputGates[i].allQueuesEmpty()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index b9211b1..92f8553 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -290,6 +291,7 @@ public class TwoInputStreamTaskTest {
                testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
                testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
 
+               expectedOutput.add(new CancelCheckpointMarker(0));
                expectedOutput.add(new StreamRecord<String>("Hello-0-0", 
initialTime));
                expectedOutput.add(new StreamRecord<String>("Ciao-0-0", 
initialTime));
                expectedOutput.add(new CheckpointBarrier(1, 1));

Reply via email to