[FLINK-4985] [checkpointing] Report canceled / declined checkpoints to the 
Checkpoint Coordinator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48a48139
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48a48139
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48a48139

Branch: refs/heads/master
Commit: 48a48139172e86f548f3b2f1564bdc948c3fe76a
Parents: 0a79dd5
Author: Stephan Ewen <[email protected]>
Authored: Wed Nov 2 22:34:59 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Nov 8 21:15:34 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  77 +++----
 .../decline/CheckpointDeclineException.java     |  35 +++
 ...ntDeclineOnCancellationBarrierException.java |  32 +++
 .../CheckpointDeclineSubsumedException.java     |  32 +++
 ...intDeclineTaskNotCheckpointingException.java |  32 +++
 .../CheckpointDeclineTaskNotReadyException.java |  32 +++
 .../decline/InputEndOfStreamException.java      |  32 +++
 .../flink/runtime/execution/Environment.java    |  13 +-
 .../runtime/jobgraph/tasks/StatefulTask.java    |   3 +-
 .../messages/checkpoint/DeclineCheckpoint.java  |  65 +++---
 .../ActorGatewayCheckpointResponder.java        |  12 +-
 .../taskmanager/CheckpointResponder.java        |   6 +-
 .../runtime/taskmanager/RuntimeEnvironment.java |   5 +
 .../apache/flink/runtime/taskmanager/Task.java  |  25 ++-
 .../flink/runtime/jobmanager/JobManager.scala   |   5 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 175 ++-------------
 .../jobmanager/JobManagerHARecoveryTest.java    |   2 +-
 .../operators/testutils/DummyEnvironment.java   |   5 +
 .../operators/testutils/MockEnvironment.java    |   7 +
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   6 +-
 .../streaming/runtime/io/BarrierBuffer.java     |  28 ++-
 .../streaming/runtime/io/BarrierTracker.java    |   4 +-
 .../streaming/runtime/tasks/StreamTask.java     |  33 ++-
 .../streaming/runtime/io/BarrierBufferTest.java |  33 +--
 .../runtime/io/BarrierTrackerTest.java          |   2 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |   4 +-
 .../runtime/tasks/StreamMockEnvironment.java    |   3 +
 .../StreamTaskCancellationBarrierTest.java      | 214 +++++++++++++++++++
 .../runtime/tasks/StreamTaskTestHarness.java    |  31 ++-
 .../runtime/tasks/TwoInputStreamTaskTest.java   |   4 +-
 .../EventTimeAllWindowCheckpointingITCase.java  |   2 +-
 31 files changed, 643 insertions(+), 316 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 26702c9..8088c3d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
@@ -278,10 +279,10 @@ public class CheckpointCoordinator {
 
                if (result.isSuccess()) {
                        return 
result.getPendingCheckpoint().getCompletionFuture();
-               } else {
+               }
+               else {
                        Throwable cause = new Exception("Failed to trigger 
savepoint: " + result.getFailureReason().message());
-                       Future<CompletedCheckpoint> failed = 
FlinkCompletableFuture.completedExceptionally(cause);
-                       return failed;
+                       return 
FlinkCompletableFuture.completedExceptionally(cause);
                }
        }
 
@@ -299,6 +300,7 @@ public class CheckpointCoordinator {
                return triggerCheckpoint(timestamp, checkpointProperties, 
checkpointDirectory, isPeriodic).isSuccess();
        }
 
+       @VisibleForTesting
        CheckpointTriggerResult triggerCheckpoint(
                        long timestamp,
                        CheckpointProperties props,
@@ -397,7 +399,7 @@ public class CheckpointCoordinator {
 
                // we lock with a special lock to make sure that trigger 
requests do not overtake each other.
                // this is not done with the coordinator-wide lock, because the 
'checkpointIdCounter'
-               // may issue blocking operations. Using a different lock than 
teh coordinator-wide lock,
+               // may issue blocking operations. Using a different lock than 
the coordinator-wide lock,
                // we avoid blocking the processing of 'acknowledge/decline' 
messages during that time.
                synchronized (triggerLock) {
                        final long checkpointID;
@@ -525,81 +527,74 @@ public class CheckpointCoordinator {
        }
 
        /**
-        * Receives a {@link DeclineCheckpoint} message and returns whether the
-        * message was associated with a pending checkpoint.
+        * Receives a {@link DeclineCheckpoint} message for a pending 
checkpoint.
         *
         * @param message Checkpoint decline from the task manager
-        *
-        * @return Flag indicating whether the declined checkpoint was 
associated
-        * with a pending checkpoint.
         */
-       public boolean receiveDeclineMessage(DeclineCheckpoint message) throws 
Exception {
+       public void receiveDeclineMessage(DeclineCheckpoint message) throws 
Exception {
                if (shutdown || message == null) {
-                       return false;
+                       return;
                }
                if (!job.equals(message.getJob())) {
-                       LOG.error("Received DeclineCheckpoint message for wrong 
job: {}", message);
-                       return false;
+                       throw new IllegalArgumentException("Received 
DeclineCheckpoint message for job " +
+                               message.getJob() + " while this coordinator 
handles job " + job);
                }
 
                final long checkpointId = message.getCheckpointId();
+               final String reason = (message.getReason() != null ? 
message.getReason().getMessage() : "");
 
                PendingCheckpoint checkpoint;
 
-               // Flag indicating whether the ack message was for a known 
pending
-               // checkpoint.
-               boolean isPendingCheckpoint;
-
                synchronized (lock) {
                        // we need to check inside the lock for being shutdown 
as well, otherwise we
                        // get races and invalid error log messages
                        if (shutdown) {
-                               return false;
+                               return;
                        }
 
                        checkpoint = pendingCheckpoints.get(checkpointId);
 
                        if (checkpoint != null && !checkpoint.isDiscarded()) {
-                               isPendingCheckpoint = true;
-
-                               LOG.info("Discarding checkpoint " + checkpointId
-                                               + " because of checkpoint 
decline from task " + message.getTaskExecutionId());
+                               LOG.info("Discarding checkpoint {} because of 
checkpoint decline from task {} : {}",
+                                               checkpointId, 
message.getTaskExecutionId(), reason);
 
                                pendingCheckpoints.remove(checkpointId);
                                checkpoint.abortDeclined();
                                rememberRecentCheckpointId(checkpointId);
 
-                               boolean haveMoreRecentPending = false;
+                               // we don't have to schedule another 
"dissolving" checkpoint any more because the
+                               // cancellation barriers take care of breaking 
downstream alignments
+                               // we only need to make sure that suspended 
queued requests are resumed
 
+                               boolean haveMoreRecentPending = false;
                                for (PendingCheckpoint p : 
pendingCheckpoints.values()) {
-                                       if (!p.isDiscarded() && 
p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
+                                       if (!p.isDiscarded() && 
p.getCheckpointId() >= checkpoint.getCheckpointId()) {
                                                haveMoreRecentPending = true;
                                                break;
                                        }
                                }
-                               if (!haveMoreRecentPending && 
!triggerRequestQueued) {
-                                       LOG.info("Triggering new checkpoint 
because of discarded checkpoint " + checkpointId);
-                                       
triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), 
checkpoint.getTargetDirectory(), checkpoint.isPeriodic());
-                               } else if (!haveMoreRecentPending) {
-                                       LOG.info("Promoting queued checkpoint 
request because of discarded checkpoint " + checkpointId);
+
+                               if (!haveMoreRecentPending) {
                                        triggerQueuedRequests();
                                }
-                       } else if (checkpoint != null) {
+                       }
+                       else if (checkpoint != null) {
                                // this should not happen
                                throw new IllegalStateException(
                                                "Received message for discarded 
but non-removed checkpoint " + checkpointId);
-                       } else {
-                               // message is for an unknown checkpoint, or 
comes too late (checkpoint disposed)
+                       }
+                       else if (LOG.isDebugEnabled()) {
                                if 
(recentPendingCheckpoints.contains(checkpointId)) {
-                                       isPendingCheckpoint = true;
-                                       LOG.info("Received another decline 
checkpoint message for now expired checkpoint attempt " + checkpointId);
+                                       // message is for an unknown 
checkpoint, or comes too late (checkpoint disposed)
+                                       LOG.debug("Received another decline 
message for now expired checkpoint attempt {} : {}",
+                                                       checkpointId, reason);
                                } else {
-                                       isPendingCheckpoint = false;
+                                       // message is for an unknown 
checkpoint. might be so old that we don't even remember it any more
+                                       LOG.debug("Received decline message for 
unknown (too old?) checkpoint attempt {} : {}",
+                                                       checkpointId, reason);
                                }
                        }
                }
-
-               return isPendingCheckpoint;
        }
 
        /**
@@ -643,9 +638,7 @@ public class CheckpointCoordinator {
                        if (checkpoint != null && !checkpoint.isDiscarded()) {
                                isPendingCheckpoint = true;
 
-                               if (checkpoint.acknowledgeTask(
-                                               message.getTaskExecutionId(),
-                                               message.getSubtaskState())) {
+                               if 
(checkpoint.acknowledgeTask(message.getTaskExecutionId(), 
message.getSubtaskState())) {
                                        if (checkpoint.isFullyAcknowledged()) {
                                                completed = 
checkpoint.finalizeCheckpoint();
 
@@ -672,8 +665,8 @@ public class CheckpointCoordinator {
                                        }
                                } else {
                                        // checkpoint did not accept message
-                                       LOG.error("Received duplicate or 
invalid acknowledge message for checkpoint " + checkpointId
-                                                       + " , task " + 
message.getTaskExecutionId());
+                                       LOG.error("Received duplicate or 
invalid acknowledge message for checkpoint {} , task {}",
+                                                       checkpointId, 
message.getTaskExecutionId());
                                }
                        }
                        else if (checkpoint != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
new file mode 100644
index 0000000..8a2802c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Base class of all exceptions that indicate a declined checkpoint.
+ */
+public abstract class CheckpointDeclineException extends Exception {
+
+       private static final long serialVersionUID = 1L;
+
+       public CheckpointDeclineException(String message) {
+               super(message);
+       }
+
+       public CheckpointDeclineException(String message, Throwable cause) {
+               super(message, cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
new file mode 100644
index 0000000..9ae4096
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a cancellation
+ * barrier was received.
+ */
+public final class CheckpointDeclineOnCancellationBarrierException extends 
CheckpointDeclineException {
+
+       private static final long serialVersionUID = 1L;
+
+       public CheckpointDeclineOnCancellationBarrierException() {
+               super("Task received cancellation from one of its inputs");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
new file mode 100644
index 0000000..5380469
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a newer 
checkpoint
+ * barrier was received on an input before the pending checkpoint's barrier. 
+ */
+public final class CheckpointDeclineSubsumedException extends 
CheckpointDeclineException {
+
+       private static final long serialVersionUID = 1L;
+
+       public CheckpointDeclineSubsumedException(long newCheckpointId) {
+               super("Checkpoint was canceled because a barrier from newer 
checkpoint " + newCheckpointId + " was received.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
new file mode 100644
index 0000000..e5773d1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a task does not 
support
+ * checkpointing.
+ */
+public final class CheckpointDeclineTaskNotCheckpointingException extends 
CheckpointDeclineException {
+
+       private static final long serialVersionUID = 1L;
+
+       public CheckpointDeclineTaskNotCheckpointingException(String taskName) {
+               super("Task '" + taskName + "'does not support checkpointing");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
new file mode 100644
index 0000000..a1214fe
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a task was not
+ * ready to perform a checkpoint.
+ */
+public final class CheckpointDeclineTaskNotReadyException extends 
CheckpointDeclineException {
+
+       private static final long serialVersionUID = 1L;
+
+       public CheckpointDeclineTaskNotReadyException(String taskName) {
+               super("Task " + taskName + " was not running");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
new file mode 100644
index 0000000..86b29dc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because one of the input
+ * stream reached its end before the alignment was complete.
+ */
+public final class InputEndOfStreamException extends 
CheckpointDeclineException {
+
+       private static final long serialVersionUID = 1L;
+
+       public InputEndOfStreamException() {
+               super("Checkpoint was declined because one input stream is 
finished");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index af1a640..8874eca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -174,9 +174,16 @@ public interface Environment {
         * @param checkpointMetaData the meta data for this checkpoint
         * @param subtaskState All state handles for the checkpointed state
         */
-       void acknowledgeCheckpoint(
-                       CheckpointMetaData checkpointMetaData,
-                       SubtaskState subtaskState);
+       void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData, 
SubtaskState subtaskState);
+
+       /**
+        * Declines a checkpoint. This tells the checkpoint coordinator that 
this task will
+        * not be able to successfully complete a certain checkpoint.
+        * 
+        * @param checkpointId The ID of the declined checkpoint.
+        * @param cause An optional reason why the checkpoint was declined.
+        */
+       void declineCheckpoint(long checkpointId, Throwable cause);
 
        /**
         * Marks task execution failed for an external reason (a reason other 
than the task code itself

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index c91dcf2..39ddc961 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -68,8 +68,9 @@ public interface StatefulTask {
         * {@link 
org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their 
outputs.
         * 
         * @param checkpointId The ID of the checkpoint to be aborted.
+        * @param cause The reason why the checkpoint was aborted during 
alignment   
         */
-       void abortCheckpointOnBarrier(long checkpointId) throws Exception;
+       void abortCheckpointOnBarrier(long checkpointId, Throwable cause) 
throws Exception;
 
        /**
         * Invoked when a checkpoint has been completed, i.e., when the 
checkpoint coordinator has received

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
index f26d2fb..830b751 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
@@ -19,7 +19,14 @@
 package org.apache.flink.runtime.messages.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
+import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.util.SerializedThrowable;
 
 /**
  * This message is sent from the {@link 
org.apache.flink.runtime.taskmanager.TaskManager} to the
@@ -31,44 +38,48 @@ public class DeclineCheckpoint extends 
AbstractCheckpointMessage implements java
 
        private static final long serialVersionUID = 2094094662279578953L;
 
-       /** The timestamp associated with the checkpoint */
-       private final long timestamp;
+       /** The reason why the checkpoint was declined */
+       private final Throwable reason;
 
-       public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, 
long checkpointId, long timestamp) {
-               super(job, taskExecutionId, checkpointId);
-               this.timestamp = timestamp;
+       public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, 
long checkpointId) {
+               this(job, taskExecutionId, checkpointId, null);
        }
 
-       // 
--------------------------------------------------------------------------------------------
-
-       public long getTimestamp() {
-               return timestamp;
+       public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, 
long checkpointId, Throwable reason) {
+               super(job, taskExecutionId, checkpointId);
+               
+               if (reason == null ||
+                       reason.getClass() == 
AlignmentLimitExceededException.class ||
+                       reason.getClass() == 
CheckpointDeclineOnCancellationBarrierException.class ||
+                       reason.getClass() == 
CheckpointDeclineSubsumedException.class ||
+                       reason.getClass() == 
CheckpointDeclineTaskNotCheckpointingException.class ||
+                       reason.getClass() == 
CheckpointDeclineTaskNotReadyException.class ||
+                       reason.getClass() == InputEndOfStreamException.class)
+               {
+                       // null or known common exceptions that cannot 
reference any dynamically loaded code
+                       this.reason = reason;
+               } else {
+                       // some other exception. replace with a serialized 
throwable, to be on the safe side
+                       this.reason = new SerializedThrowable(reason);
+               }
        }
 
        // 
--------------------------------------------------------------------------------------------
 
-       @Override
-       public int hashCode() {
-               return super.hashCode() + (int) (timestamp ^ (timestamp >>> 
32));
+       /**
+        * Gets the reason why the checkpoint was declined.
+        * 
+        * @return The reason why the checkpoint was declined
+        */
+       public Throwable getReason() {
+               return reason;
        }
 
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               else if (o instanceof DeclineCheckpoint) {
-                       DeclineCheckpoint that = (DeclineCheckpoint) o;
-                       return this.timestamp == that.timestamp && 
super.equals(o);
-               }
-               else {
-                       return false;
-               }
-       }
+       // 
--------------------------------------------------------------------------------------------
 
        @Override
        public String toString() {
-               return String.format("Declined Checkpoint %d@%d for (%s/%s)",
-                               getCheckpointId(), getTimestamp(), getJob(), 
getTaskExecutionId());
+               return String.format("Declined Checkpoint %d for (%s/%s): %s",
+                               getCheckpointId(), getJob(), 
getTaskExecutionId(), reason);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
index 38defcc..dafcefe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
@@ -54,17 +54,17 @@ public class ActorGatewayCheckpointResponder implements 
CheckpointResponder {
 
        @Override
        public void declineCheckpoint(
-               JobID jobID,
-               ExecutionAttemptID executionAttemptID,
-               CheckpointMetaData checkpointMetaData) {
+                       JobID jobID,
+                       ExecutionAttemptID executionAttemptID,
+                       long checkpointId,
+                       Throwable reason) {
 
                DeclineCheckpoint decline = new DeclineCheckpoint(
                        jobID,
                        executionAttemptID,
-                       checkpointMetaData.getCheckpointId(),
-                       checkpointMetaData.getTimestamp());
+                       checkpointId,
+                       reason);
 
                actorGateway.tell(decline);
-
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
index 7dbb76c..cdf87d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
@@ -52,10 +52,12 @@ public interface CheckpointResponder {
         *
         * @param jobID Job ID of the running job
         * @param executionAttemptID Execution attempt ID of the running task
-        * @param checkpointMetaData Meta data for this checkpoint
+        * @param checkpointId The ID of the declined checkpoint
+        * @param cause The optional cause why the checkpoint was declined   
         */
        void declineCheckpoint(
                JobID jobID,
                ExecutionAttemptID executionAttemptID,
-               CheckpointMetaData checkpointMetaData);
+               long checkpointId,
+               Throwable cause);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index fa69a60..7fe94a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -254,6 +254,11 @@ public class RuntimeEnvironment implements Environment {
        }
 
        @Override
+       public void declineCheckpoint(long checkpointId, Throwable cause) {
+               checkpointResponder.declineCheckpoint(jobId, executionId, 
checkpointId, cause);
+       }
+
+       @Override
        public void failExternally(Throwable cause) {
                this.containingTask.failExternally(cause);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 313973f..ffcf909 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -29,6 +29,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -1008,15 +1010,13 @@ public class Task implements Runnable, TaskActions {
         * @param checkpointID The ID identifying the checkpoint.
         * @param checkpointTimestamp The timestamp associated with the 
checkpoint.
         */
-       public void triggerCheckpointBarrier(long checkpointID, long 
checkpointTimestamp) {
-
-               AbstractInvokable invokable = this.invokable;
-
+       public void triggerCheckpointBarrier(final long checkpointID, long 
checkpointTimestamp) {
+               final AbstractInvokable invokable = this.invokable;
                final CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointID, checkpointTimestamp);
 
                if (executionState == ExecutionState.RUNNING && invokable != 
null) {
-                       if (invokable instanceof StatefulTask) {
 
+                       if (invokable instanceof StatefulTask) {
                                // build a local closure
                                final StatefulTask statefulTask = 
(StatefulTask) invokable;
                                final String taskName = taskNameWithSubtask;
@@ -1027,12 +1027,14 @@ public class Task implements Runnable, TaskActions {
                                                try {
                                                        boolean success = 
statefulTask.triggerCheckpoint(checkpointMetaData);
                                                        if (!success) {
-                                                               
checkpointResponder.declineCheckpoint(jobId, getExecutionId(), 
checkpointMetaData);
+                                                               
checkpointResponder.declineCheckpoint(
+                                                                               
getJobID(), getExecutionId(), checkpointID,
+                                                                               
new CheckpointDeclineTaskNotReadyException(taskName));
                                                        }
                                                }
                                                catch (Throwable t) {
                                                        if (getExecutionState() 
== ExecutionState.RUNNING) {
-                                                               
failExternally(new RuntimeException(
+                                                               
failExternally(new Exception(
                                                                        "Error 
while triggering checkpoint for " + taskName,
                                                                        t));
                                                        }
@@ -1042,12 +1044,19 @@ public class Task implements Runnable, TaskActions {
                                executeAsyncCallRunnable(runnable, "Checkpoint 
Trigger for " + taskName);
                        }
                        else {
+                               checkpointResponder.declineCheckpoint(jobId, 
executionId, checkpointID,
+                                               new 
CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask));
+
                                LOG.error("Task received a checkpoint request, 
but is not a checkpointing task - "
                                                + taskNameWithSubtask);
                        }
                }
                else {
-                       LOG.debug("Ignoring request to trigger a checkpoint for 
non-running task.");
+                       LOG.debug("Declining checkpoint request for non-running 
task");
+
+                       // send back a message that we did not do the checkpoint
+                       checkpointResponder.declineCheckpoint(jobId, 
executionId, checkpointID,
+                                       new 
CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 68e71ef..31f9dd7 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1435,10 +1435,7 @@ class JobManager(
             if (checkpointCoordinator != null) {
               future {
                 try {
-                  if 
(!checkpointCoordinator.receiveDeclineMessage(declineMessage)) {
-                    log.info("Received message for non-existing checkpoint " +
-                      declineMessage.getCheckpointId)
-                  }
+                 checkpointCoordinator.receiveDeclineMessage(declineMessage)
                 }
                 catch {
                   case t: Throwable =>

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index b874612..5c50c02 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -36,8 +36,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
-import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -311,11 +309,8 @@ public class CheckpointCoordinatorTest {
                        assertFalse(checkpoint.isFullyAcknowledged());
 
                        // check that the vertices received the trigger 
checkpoint message
-                       {
-                               TriggerCheckpoint expectedMessage2 = new 
TriggerCheckpoint(jid, attemptID2, checkpointId, timestamp);
-                               verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
-                               verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
-                       }
+                       
verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, 
timestamp);
+                       
verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, 
timestamp);
 
                        CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, 0L);
 
@@ -334,42 +329,19 @@ public class CheckpointCoordinatorTest {
 
                        // decline checkpoint from the other task, this should 
cancel the checkpoint
                        // and trigger a new one
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId));
                        assertTrue(checkpoint.isDiscarded());
 
-                       // validate that we have a new pending checkpoint
-                       assertEquals(1, coord.getNumberOfPendingCheckpoints());
+                       // validate that we have no new pending checkpoint
+                       assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
-                       long checkpointIdNew = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-                       PendingCheckpoint checkpointNew = 
coord.getPendingCheckpoints().get(checkpointIdNew);
-
-                       assertNotNull(checkpointNew);
-                       assertEquals(checkpointIdNew, 
checkpointNew.getCheckpointId());
-                       assertEquals(jid, checkpointNew.getJobId());
-                       assertEquals(2, 
checkpointNew.getNumberOfNonAcknowledgedTasks());
-                       assertEquals(0, 
checkpointNew.getNumberOfAcknowledgedTasks());
-                       assertEquals(0, checkpointNew.getTaskStates().size());
-                       assertFalse(checkpointNew.isDiscarded());
-                       assertFalse(checkpointNew.isFullyAcknowledged());
-                       assertNotEquals(checkpoint.getCheckpointId(), 
checkpointNew.getCheckpointId());
-
-                       // check that the vertices received the new trigger 
checkpoint message
-                       {
-                               verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointIdNew), 
eq(checkpointNew.getCheckpointTimestamp()));
-                               verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointIdNew), 
eq(checkpointNew.getCheckpointTimestamp()));
-                       }
-
                        // decline again, nothing should happen
                        // decline from the other task, nothing should happen
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID2, checkpointId, checkpoint.getCheckpointTimestamp()));
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId));
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID2, checkpointId));
                        assertTrue(checkpoint.isDiscarded());
 
-                       // should still have the same second checkpoint pending
-                       long checkpointIdNew2 = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-                       assertEquals(checkpointIdNew2, checkpointIdNew);
-
                        coord.shutdown(JobStatus.FINISHED);
                }
                catch (Exception e) {
@@ -464,7 +436,7 @@ public class CheckpointCoordinatorTest {
 
                        // decline checkpoint from one of the tasks, this 
should cancel the checkpoint
                        // and trigger a new one
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint1Id));
                        assertTrue(checkpoint1.isDiscarded());
 
                        // validate that we have only one pending checkpoint 
left
@@ -488,8 +460,8 @@ public class CheckpointCoordinatorTest {
 
                        // decline again, nothing should happen
                        // decline from the other task, nothing should happen
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
-                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID2, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint1Id));
+                       coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID2, checkpoint1Id));
                        assertTrue(checkpoint1.isDiscarded());
 
                        coord.shutdown(JobStatus.FINISHED);
@@ -1349,8 +1321,6 @@ public class CheckpointCoordinatorTest {
                        verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
                        verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
 
-                       NotifyCheckpointComplete confirmMessage1 = new 
NotifyCheckpointComplete(jid, attemptID1, checkpointIdNew, timestampNew);
-                       NotifyCheckpointComplete confirmMessage2 = new 
NotifyCheckpointComplete(jid, attemptID2, checkpointIdNew, timestampNew);
                        verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
                        verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
                }
@@ -1765,11 +1735,7 @@ public class CheckpointCoordinatorTest {
                assertFalse("Did not trigger savepoint", savepoint1.isDone());
        }
 
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-/**
+       /**
         * Tests that the checkpointed partitioned and non-partitioned state is 
assigned properly to
         * the {@link Execution} upon recovery.
         *
@@ -2328,30 +2294,6 @@ public class CheckpointCoordinatorTest {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       static void sendAckMessageToCoordinator(
-                       CheckpointCoordinator coord,
-                       long checkpointId, JobID jid,
-                       ExecutionJobVertex jobVertex,
-                       JobVertexID jobVertexID,
-                       List<KeyGroupRange> keyGroupPartitions) throws 
Exception {
-
-               for (int index = 0; index < jobVertex.getParallelism(); 
index++) {
-                       ChainedStateHandle<StreamStateHandle> state = 
generateStateForVertex(jobVertexID, index);
-                       KeyGroupsStateHandle keyGroupState = 
generateKeyGroupState(
-                                       jobVertexID,
-                                       keyGroupPartitions.get(index), false);
-
-                       SubtaskState checkpointStateHandles = new 
SubtaskState(state, null, null, keyGroupState, null, 0);
-                       AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
-                                       jid,
-                                       
jobVertex.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       new CheckpointMetaData(checkpointId, 
0L),
-                                       checkpointStateHandles);
-
-                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
-               }
-       }
-
        public static KeyGroupsStateHandle generateKeyGroupState(
                        JobVertexID jobVertexID,
                        KeyGroupRange keyGroupPartition, boolean rawState) 
throws IOException {
@@ -2467,7 +2409,7 @@ public class CheckpointCoordinatorTest {
                return generateChainedPartitionableStateHandle(statesListsMap);
        }
 
-       public static ChainedStateHandle<OperatorStateHandle> 
generateChainedPartitionableStateHandle(
+       private static ChainedStateHandle<OperatorStateHandle> 
generateChainedPartitionableStateHandle(
                        Map<String, List<? extends Serializable>> states) 
throws IOException {
 
                List<List<? extends Serializable>> namedStateSerializables = 
new ArrayList<>(states.size());
@@ -2495,7 +2437,7 @@ public class CheckpointCoordinatorTest {
                return ChainedStateHandle.wrapSingleHandle(operatorStateHandle);
        }
 
-       public static ExecutionJobVertex mockExecutionJobVertex(
+       private static ExecutionJobVertex mockExecutionJobVertex(
                JobVertexID jobVertexID,
                int parallelism,
                int maxParallelism) {
@@ -2847,95 +2789,4 @@ public class CheckpointCoordinatorTest {
                Assert.assertEquals(expectedTotalPartitions, 
actualTotalPartitions);
                Assert.assertEquals(expected, actual);
        }
-
-       @Test
-       public void testDeclineCheckpointRespectsProperties() throws Exception {
-               final JobID jid = new JobID();
-               final long timestamp = System.currentTimeMillis();
-
-               // create some mock Execution vertices that receive the 
checkpoint trigger messages
-               final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
-               ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
-
-               // set up the coordinator and validate the initial state
-               CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               600000,
-                               600000,
-                               0,
-                               Integer.MAX_VALUE,
-                               ExternalizedCheckpointSettings.none(),
-                               new ExecutionVertex[] { vertex1 },
-                               new ExecutionVertex[] { vertex1 },
-                               new ExecutionVertex[] { vertex1 },
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1),
-                               null,
-                               new DisabledCheckpointStatsTracker());
-
-               assertEquals(0, coord.getNumberOfPendingCheckpoints());
-               assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
-
-               // trigger the first checkpoint. this should succeed
-               CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
-               String targetDirectory = "xjasdkjakshdmmmxna";
-
-               CheckpointTriggerResult triggerResult = 
coord.triggerCheckpoint(timestamp, props, targetDirectory, false);
-               assertTrue(triggerResult.isSuccess());
-
-               // validate that we have a pending checkpoint
-               assertEquals(1, coord.getNumberOfPendingCheckpoints());
-               assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
-
-               long checkpointId = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-               PendingCheckpoint checkpoint = 
coord.getPendingCheckpoints().get(checkpointId);
-
-               assertNotNull(checkpoint);
-               assertEquals(checkpointId, checkpoint.getCheckpointId());
-               assertEquals(timestamp, checkpoint.getCheckpointTimestamp());
-               assertEquals(jid, checkpoint.getJobId());
-               assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
-               assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
-               assertEquals(0, checkpoint.getTaskStates().size());
-               assertFalse(checkpoint.isDiscarded());
-               assertFalse(checkpoint.isFullyAcknowledged());
-               assertEquals(props, checkpoint.getProps());
-               assertEquals(targetDirectory, checkpoint.getTargetDirectory());
-
-               {
-                       // check that the vertices received the trigger 
checkpoint message
-                       verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
-               }
-
-               // decline checkpoint, this should cancel the checkpoint and 
re-trigger with correct properties
-               coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
-               assertTrue(checkpoint.isDiscarded());
-
-               // validate that we have a new pending checkpoint
-               assertEquals(1, coord.getNumberOfPendingCheckpoints());
-               assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
-
-               long checkpointIdNew = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-               PendingCheckpoint checkpointNew = 
coord.getPendingCheckpoints().get(checkpointIdNew);
-
-               assertNotNull(checkpointNew);
-               assertEquals(checkpointIdNew, checkpointNew.getCheckpointId());
-               assertEquals(jid, checkpointNew.getJobId());
-               assertEquals(1, 
checkpointNew.getNumberOfNonAcknowledgedTasks());
-               assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
-               assertEquals(0, checkpointNew.getTaskStates().size());
-               assertFalse(checkpointNew.isDiscarded());
-               assertFalse(checkpointNew.isFullyAcknowledged());
-               assertNotEquals(checkpoint.getCheckpointId(), 
checkpointNew.getCheckpointId());
-               // Respect the properties and target directory from the initial 
trigger
-               assertEquals(props, checkpointNew.getProps());
-               assertEquals(targetDirectory, 
checkpointNew.getTargetDirectory());
-
-               // check that the vertices received the new trigger checkpoint 
message
-               {
-                       verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).triggerCheckpoint(eq(checkpointIdNew), 
eq(checkpointNew.getCheckpointTimestamp()));
-               }
-
-               coord.shutdown(JobStatus.FINISHED);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index e40b2df..7d9c521 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -476,7 +476,7 @@ public class JobManagerHARecoveryTest {
                }
 
                @Override
-               public void abortCheckpointOnBarrier(long checkpointId) {
+               public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) {
                        throw new UnsupportedOperationException("should not be 
called!");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index f2616b5..0b11730 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -159,6 +159,11 @@ public class DummyEnvironment implements Environment {
        }
 
        @Override
+       public void declineCheckpoint(long checkpointId, Throwable cause) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
        public void failExternally(Throwable cause) {
                throw new UnsupportedOperationException("DummyEnvironment does 
not support external task failure.");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 08b84cb..646c038 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -313,10 +313,17 @@ public class MockEnvironment implements Environment {
 
        @Override
        public void acknowledgeCheckpoint(CheckpointMetaData 
checkpointMetaData) {
+               throw new UnsupportedOperationException();
        }
 
        @Override
        public void acknowledgeCheckpoint(CheckpointMetaData 
checkpointMetaData, SubtaskState subtaskState) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void declineCheckpoint(long checkpointId, Throwable cause) {
+               throw new UnsupportedOperationException();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index ab29bb0..0a522af 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -205,9 +205,7 @@ public class TaskAsyncCallTest {
                }
 
                @Override
-               public void setInitialState(TaskStateHandles taskStateHandles) 
throws Exception {
-
-               }
+               public void setInitialState(TaskStateHandles taskStateHandles) 
throws Exception {}
 
                @Override
                public boolean triggerCheckpoint(CheckpointMetaData 
checkpointMetaData) {
@@ -232,7 +230,7 @@ public class TaskAsyncCallTest {
                }
 
                @Override
-               public void abortCheckpointOnBarrier(long checkpointId) {
+               public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) {
                        throw new UnsupportedOperationException("Should not be 
called");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 3ccb575..66aaa44 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -18,7 +18,11 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -143,7 +147,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                }
                                else {
                                        if (next.getEvent().getClass() == 
EndOfPartitionEvent.class) {
-                                               
processEndOfPartition(next.getChannelIndex());
+                                               processEndOfPartition();
                                        }
                                        return next;
                                }
@@ -197,7 +201,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                                "Skipping current checkpoint.", 
barrierId, currentCheckpointId);
 
                                // let the task know we are not completing this
-                               notifyAbort(currentCheckpointId);
+                               notifyAbort(currentCheckpointId, new 
CheckpointDeclineSubsumedException(barrierId));
 
                                // abort the current checkpoint
                                releaseBlocksAndResetBarriers();
@@ -242,7 +246,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        if (barrierId > currentCheckpointId) {
                                // new checkpoint
                                currentCheckpointId = barrierId;
-                               notifyAbort(barrierId);
+                               notifyAbortOnCancellationBarrier(barrierId);
                        }
                        return;
                }
@@ -259,7 +263,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                }
 
                                releaseBlocksAndResetBarriers();
-                               notifyAbort(barrierId);
+                               notifyAbortOnCancellationBarrier(barrierId);
                        }
                        else if (barrierId > currentCheckpointId) {
                                // we canceled the next which also cancels the 
current
@@ -273,7 +277,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                currentCheckpointId = barrierId;
                                startOfAlignmentTimestamp = 0L;
                                latestAlignmentDurationNanos = 0L;
-                               notifyAbort(barrierId);
+                               notifyAbortOnCancellationBarrier(barrierId);
                        }
 
                        // else: ignore trailing (cancellation) barrier from an 
earlier checkpoint (obsolete now)
@@ -293,7 +297,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                LOG.debug("Checkpoint {} canceled, skipping 
alignment", barrierId);
                        }
 
-                       notifyAbort(barrierId);
+                       notifyAbortOnCancellationBarrier(barrierId);
                }
 
                // else: trailing barrier from either
@@ -301,12 +305,12 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                //   - the current checkpoint if it was already canceled
        }
 
-       private void processEndOfPartition(int channel) throws Exception {
+       private void processEndOfPartition() throws Exception {
                numClosedChannels++;
 
                if (numBarriersReceived > 0) {
                        // let the task know we skip a checkpoint
-                       notifyAbort(currentCheckpointId);
+                       notifyAbort(currentCheckpointId, new 
InputEndOfStreamException());
 
                        // no chance to complete this checkpoint
                        releaseBlocksAndResetBarriers();
@@ -326,9 +330,13 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                }
        }
 
-       private void notifyAbort(long checkpointId) throws Exception {
+       private void notifyAbortOnCancellationBarrier(long checkpointId) throws 
Exception {
+               notifyAbort(checkpointId, new 
CheckpointDeclineOnCancellationBarrierException());
+       }
+
+       private void notifyAbort(long checkpointId, CheckpointDeclineException 
cause) throws Exception {
                if (toNotifyOnCheckpoint != null) {
-                       
toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+                       
toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index cce7cb4..72838bc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -234,7 +235,8 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
 
        private void notifyAbort(long checkpointId) throws Exception {
                if (toNotifyOnCheckpoint != null) {
-                       
toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+                       toNotifyOnCheckpoint.abortCheckpointOnBarrier(
+                                       checkpointId, new 
CheckpointDeclineOnCancellationBarrierException());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1e03a96..bd34044 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -540,22 +542,24 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        }
 
        @Override
-       public void abortCheckpointOnBarrier(long checkpointId) throws 
Exception {
+       public void abortCheckpointOnBarrier(long checkpointId, Throwable 
cause) throws Exception {
                LOG.debug("Aborting checkpoint via cancel-barrier {} for task 
{}", checkpointId, getName());
 
+               // notify the coordinator that we decline this checkpoint
+               getEnvironment().declineCheckpoint(checkpointId, cause);
+
+               // notify all downstream operators that they should not wait 
for a barrier from us
                synchronized (lock) {
-                       if (isRunning) {
-                               
operatorChain.broadcastCheckpointCancelMarker(checkpointId);
-                       }
+                       
operatorChain.broadcastCheckpointCancelMarker(checkpointId);
                }
        }
 
        private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws Exception {
-
                LOG.debug("Starting checkpoint {} on task {}", 
checkpointMetaData.getCheckpointId(), getName());
 
                synchronized (lock) {
                        if (isRunning) {
+                               // we can do a checkpoint
 
                                // Since both state checkpointing and 
downstream barrier emission occurs in this
                                // lock scope, they are an atomic operation 
regardless of the order in which they occur.
@@ -566,7 +570,18 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                                checkpointState(checkpointMetaData);
                                return true;
-                       } else {
+                       }
+                       else {
+                               // we cannot perform our checkpoint - let the 
downstream operators know that they
+                               // should not wait for any input from this 
operator
+
+                               // we cannot broadcast the cancellation markers 
on the 'operator chain', because it may not
+                               // yet be created
+                               final CancelCheckpointMarker message = new 
CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
+                               for (ResultPartitionWriter output : 
getEnvironment().getAllWriters()) {
+                                       output.writeEventToAllChannels(message);
+                               }
+
                                return false;
                        }
                }
@@ -832,10 +847,10 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                private final List<OperatorSnapshotResult> 
snapshotInProgressList;
 
-               RunnableFuture<KeyGroupsStateHandle> 
futureKeyedBackendStateHandles;
-               RunnableFuture<KeyGroupsStateHandle> 
futureKeyedStreamStateHandles;
+               private RunnableFuture<KeyGroupsStateHandle> 
futureKeyedBackendStateHandles;
+               private RunnableFuture<KeyGroupsStateHandle> 
futureKeyedStreamStateHandles;
 
-               List<StreamStateHandle> nonPartitionedStateHandles;
+               private List<StreamStateHandle> nonPartitionedStateHandles;
 
                private final CheckpointMetaData checkpointMetaData;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 8754e10..446cc77 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -52,6 +54,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -571,7 +574,7 @@ public class BarrierBufferTest {
                        check(sequence[12], buffer.getNextNonBlocked());
                        assertEquals(3L, buffer.getCurrentCheckpointId());
                        validateAlignmentTime(startTs, buffer);
-                       verify(toNotify).abortCheckpointOnBarrier(2L);
+                       verify(toNotify).abortCheckpointOnBarrier(eq(2L), 
any(CheckpointDeclineSubsumedException.class));
                        check(sequence[16], buffer.getNextNonBlocked());
 
                        // checkpoint 3 alignment in progress
@@ -579,7 +582,7 @@ public class BarrierBufferTest {
 
                        // checkpoint 3 aborted (end of partition)
                        check(sequence[20], buffer.getNextNonBlocked());
-                       verify(toNotify).abortCheckpointOnBarrier(3L);
+                       verify(toNotify).abortCheckpointOnBarrier(eq(3L), 
any(CheckpointDeclineSubsumedException.class));
 
                        // replay buffered data from checkpoint 3
                        check(sequence[18], buffer.getNextNonBlocked());
@@ -1004,13 +1007,13 @@ public class BarrierBufferTest {
                check(sequence[6], buffer.getNextNonBlocked());
                assertEquals(5L, buffer.getCurrentCheckpointId());
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                check(sequence[8], buffer.getNextNonBlocked());
                assertEquals(6L, buffer.getCurrentCheckpointId());
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
                
                buffer.cleanup();
@@ -1078,7 +1081,7 @@ public class BarrierBufferTest {
                // canceled checkpoint on last barrier
                startTs = System.nanoTime();
                check(sequence[12], buffer.getNextNonBlocked());
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
                validateAlignmentTime(startTs, buffer);
                check(sequence[13], buffer.getNextNonBlocked());
 
@@ -1093,7 +1096,7 @@ public class BarrierBufferTest {
 
                // this checkpoint gets immediately canceled
                check(sequence[24], buffer.getNextNonBlocked());
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                // some buffers
@@ -1109,7 +1112,7 @@ public class BarrierBufferTest {
                check(sequence[33], buffer.getNextNonBlocked());
 
                check(sequence[37], buffer.getNextNonBlocked());
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                // all done
@@ -1172,7 +1175,7 @@ public class BarrierBufferTest {
 
                // re-read the queued cancellation barriers
                check(sequence[9], buffer.getNextNonBlocked());
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                check(sequence[10], buffer.getNextNonBlocked());
@@ -1189,7 +1192,7 @@ public class BarrierBufferTest {
 
                // no further checkpoint (abort) notifications
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), 
any(CheckpointDeclineOnCancellationBarrierException.class));
 
                // all done
                assertNull(buffer.getNextNonBlocked());
@@ -1258,7 +1261,7 @@ public class BarrierBufferTest {
                // cancelled by cancellation barrier
                check(sequence[4], buffer.getNextNonBlocked());
                validateAlignmentTime(startTs, buffer);
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(1L);
+               verify(toNotify).abortCheckpointOnBarrier(eq(1L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
 
                // the next checkpoint alignment starts now
                startTs = System.nanoTime();
@@ -1270,7 +1273,7 @@ public class BarrierBufferTest {
                // checkpoint done
                check(sequence[7], buffer.getNextNonBlocked());
                validateAlignmentTime(startTs, buffer);
-               verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
+               verify(toNotify).triggerCheckpointOnBarrier(argThat(new 
CheckpointMatcher(2L)));
 
                // queued data
                check(sequence[10], buffer.getNextNonBlocked());
@@ -1290,7 +1293,7 @@ public class BarrierBufferTest {
 
                // check overall notifications
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), 
any(Throwable.class));
        }
 
        /**
@@ -1342,7 +1345,7 @@ public class BarrierBufferTest {
                // future barrier aborts checkpoint
                startTs = System.nanoTime();
                check(sequence[3], buffer.getNextNonBlocked());
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(3L);
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), 
any(CheckpointDeclineSubsumedException.class));
                check(sequence[4], buffer.getNextNonBlocked());
 
                // alignment of next checkpoint
@@ -1371,7 +1374,7 @@ public class BarrierBufferTest {
 
                // check overall notifications
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
-               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+               verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), 
any(Throwable.class));
        }
 
        // 
------------------------------------------------------------------------
@@ -1479,7 +1482,7 @@ public class BarrierBufferTest {
                }
 
                @Override
-               public void abortCheckpointOnBarrier(long checkpointId) {}
+               public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) {}
 
                @Override
                public void notifyCheckpointComplete(long checkpointId) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index fa3363e..7ae144d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -482,7 +482,7 @@ public class BarrierTrackerTest {
                }
 
                @Override
-               public void abortCheckpointOnBarrier(long checkpointId) {
+               public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) {
                        assertTrue("More checkpoints than expected", i < 
checkpointIDs.length);
 
                        final long expectedId = checkpointIDs[i++];

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 42d7cec..be93f6a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -32,6 +32,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -305,6 +306,7 @@ public class OneInputStreamTaskTest extends TestLogger {
                testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
                testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
 
+               expectedOutput.add(new CancelCheckpointMarker(0));
                expectedOutput.add(new StreamRecord<String>("Hello-0-0", 
initialTime));
                expectedOutput.add(new StreamRecord<String>("Ciao-0-0", 
initialTime));
                expectedOutput.add(new CheckpointBarrier(1, 1));

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 2376a60..52daf6f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -322,6 +322,9 @@ public class StreamMockEnvironment implements Environment {
        }
 
        @Override
+       public void declineCheckpoint(long checkpointId, Throwable cause) {}
+
+       @Override
        public void failExternally(Throwable cause) {
                this.wasFailedExternally = true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
new file mode 100644
index 0000000..95828f8
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class StreamTaskCancellationBarrierTest {
+
+       /**
+        * This test checks that tasks emit a proper cancel checkpoint barrier, 
if a "trigger checkpoint" message
+        * comes before they are ready.
+        */
+       @Test
+       public void testEmitCancellationBarrierWhenNotReady() throws Exception {
+               StreamTask<String, ?> task = new InitBlockingTask();
+               StreamTaskTestHarness<String> testHarness = new 
StreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO);
+
+               // start the test - this cannot succeed across the 'init()' 
method
+               testHarness.invoke();
+
+               // tell the task to commence a checkpoint
+               boolean result = task.triggerCheckpoint(new 
CheckpointMetaData(41L, System.currentTimeMillis()));
+               assertFalse("task triggered checkpoint though not ready", 
result);
+
+               // a cancellation barrier should be downstream
+               Object emitted = testHarness.getOutput().poll();
+               assertNotNull("nothing emitted", emitted);
+               assertTrue("wrong type emitted", emitted instanceof 
CancelCheckpointMarker);
+               assertEquals("wrong checkpoint id", 41L, 
((CancelCheckpointMarker) emitted).getCheckpointId());
+       }
+
+       /**
+        * This test verifies (for onw input tasks) that the Stream tasks react 
the following way to
+        * receiving a checkpoint cancellation barrier:
+        * 
+        *   - send a "decline checkpoint" notification out (to the JobManager)
+        *   - emit a cancellation barrier downstream
+        */
+       @Test
+       public void testDeclineCallOnCancelBarrierOneInput() throws Exception {
+
+               OneInputStreamTask<String, String> task = new 
OneInputStreamTask<String, String>();
+               OneInputStreamTaskTestHarness<String, String> testHarness = new 
OneInputStreamTaskTestHarness<>(
+                               task,
+                               1, 2,
+                               BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               StreamConfig streamConfig = testHarness.getStreamConfig();
+               StreamMap<String, String> mapOperator = new StreamMap<>(new 
IdentityMap());
+               streamConfig.setStreamOperator(mapOperator);
+
+               StreamMockEnvironment environment = 
spy(testHarness.createEnvironment());
+
+               // start the task
+               testHarness.invoke(environment);
+               testHarness.waitForTaskRunning();
+
+               // emit cancellation barriers
+               testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 1);
+               testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0);
+               testHarness.waitForInputProcessing();
+
+               // the decline call should go to the coordinator
+               verify(environment, times(1)).declineCheckpoint(eq(2L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
+
+               // a cancellation barrier should be downstream
+               Object result = testHarness.getOutput().poll();
+               assertNotNull("nothing emitted", result);
+               assertTrue("wrong type emitted", result instanceof 
CancelCheckpointMarker);
+               assertEquals("wrong checkpoint id", 2L, 
((CancelCheckpointMarker) result).getCheckpointId());
+
+               // cancel and shutdown
+               testHarness.endInput();
+               testHarness.waitForTaskCompletion();
+       }
+
+       /**
+        * This test verifies (for onw input tasks) that the Stream tasks react 
the following way to
+        * receiving a checkpoint cancellation barrier:
+        *
+        *   - send a "decline checkpoint" notification out (to the JobManager)
+        *   - emit a cancellation barrier downstream
+        */
+       @Test
+       public void testDeclineCallOnCancelBarrierTwoInputs() throws Exception {
+
+               TwoInputStreamTask<String, String, String> task = new 
TwoInputStreamTask<String, String, String>();
+               TwoInputStreamTaskTestHarness<String, String, String> 
testHarness = new TwoInputStreamTaskTestHarness<>(
+                               task,
+                               BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+               StreamConfig streamConfig = testHarness.getStreamConfig();
+               CoStreamMap<String, String, String> op = new CoStreamMap<>(new 
UnionCoMap());
+               streamConfig.setStreamOperator(op);
+
+               StreamMockEnvironment environment = 
spy(testHarness.createEnvironment());
+
+               // start the task
+               testHarness.invoke(environment);
+               testHarness.waitForTaskRunning();
+
+               // emit cancellation barriers
+               testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0);
+               testHarness.processEvent(new CancelCheckpointMarker(2L), 1, 0);
+               testHarness.waitForInputProcessing();
+
+               // the decline call should go to the coordinator
+               verify(environment, times(1)).declineCheckpoint(eq(2L), 
any(CheckpointDeclineOnCancellationBarrierException.class));
+
+               // a cancellation barrier should be downstream
+               Object result = testHarness.getOutput().poll();
+               assertNotNull("nothing emitted", result);
+               assertTrue("wrong type emitted", result instanceof 
CancelCheckpointMarker);
+               assertEquals("wrong checkpoint id", 2L, 
((CancelCheckpointMarker) result).getCheckpointId());
+
+               // cancel and shutdown
+               testHarness.endInput();
+               testHarness.waitForTaskCompletion();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test tasks / functions
+       // 
------------------------------------------------------------------------
+
+       private static class InitBlockingTask extends StreamTask<String, 
AbstractStreamOperator<String>> {
+
+               private final Object lock = new Object();
+               private volatile boolean running = true;
+               
+               @Override
+               protected void init() throws Exception {
+                       synchronized (lock) {
+                               while (running) {
+                                       lock.wait();
+                               }
+                       }
+               }
+
+               @Override
+               protected void run() throws Exception {}
+
+               @Override
+               protected void cleanup() throws Exception {}
+
+               @Override
+               protected void cancelTask() throws Exception {
+                       running = false;
+                       synchronized (lock) {
+                               lock.notifyAll();
+                       }
+               }
+       }
+
+       private static class IdentityMap implements MapFunction<String, String> 
{
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public String map(String value) throws Exception {
+                       return value;
+               }
+       }
+
+       private static class UnionCoMap implements CoMapFunction<String, 
String, String> {
+               private static final long serialVersionUID = 1L;
+               
+               @Override
+               public String map1(String value) throws Exception {
+                       return value;
+               }
+
+               @Override
+               public String map2(String value) throws Exception {
+                       return value;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index c531c0d..b71e38d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+
 import org.junit.Assert;
 
 import java.io.IOException;
@@ -150,21 +151,18 @@ public class StreamTaskTestHarness<OUT> {
 
        }
 
+       public StreamMockEnvironment createEnvironment() {
+               return new StreamMockEnvironment(
+                               jobConfig, taskConfig, executionConfig, 
memorySize, new MockInputSplitProvider(), bufferSize);
+       }
+
        /**
         * Invoke the Task. This resets the output of any previous invocation. 
This will start a new
         * Thread to execute the Task in. Use {@link #waitForTaskCompletion()} 
to wait for the
         * Task thread to finish running.
         */
        public void invoke() throws Exception {
-               mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, 
executionConfig,
-                       memorySize, new MockInputSplitProvider(), bufferSize);
-               task.setEnvironment(mockEnv);
-
-               initializeInputs();
-               initializeOutput();
-
-               taskThread = new TaskThread(task);
-               taskThread.start();
+               invoke(createEnvironment());
        }
 
        /**
@@ -237,7 +235,7 @@ public class StreamTaskTestHarness<OUT> {
                        if (taskThread.task instanceof StreamTask) {
                                StreamTask<?, ?> streamTask = (StreamTask<?, 
?>) taskThread.task;
                                while (!streamTask.isRunning()) {
-                                       Thread.sleep(100);
+                                       Thread.sleep(10);
                                        if (!taskThread.isAlive()) {
                                                if (taskThread.getError() != 
null) {
                                                        throw new 
Exception("Task Thread failed due to an error.", taskThread.getError());
@@ -314,15 +312,14 @@ public class StreamTaskTestHarness<OUT> {
        /**
         * This only returns after all input queues are empty.
         */
-       public void waitForInputProcessing() {
-
+       public void waitForInputProcessing() throws Exception {
 
-               // first wait for all input queues to be empty
-               try {
-                       Thread.sleep(1);
-               } catch (InterruptedException ignored) {}
-               
                while (true) {
+                       Throwable error = taskThread.getError();
+                       if (error != null) {
+                               throw new Exception("Exception in the task 
thread", error);
+                       }
+
                        boolean allEmpty = true;
                        for (int i = 0; i < numInputGates; i++) {
                                if (!inputGates[i].allQueuesEmpty()) {

Reply via email to