[FLINK-4985] [checkpointing] Report canceled / declined checkpoints to the Checkpoint Coordinator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a4fdfff Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a4fdfff Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a4fdfff Branch: refs/heads/release-1.1 Commit: 1a4fdfff5d364a35e935604c0a5058a1a9f242f7 Parents: a1f028d Author: Stephan Ewen <[email protected]> Authored: Tue Nov 8 17:13:19 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Tue Nov 8 19:07:16 2016 +0100 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 18 +- .../AlignmentLimitExceededException.java | 33 +++ .../decline/CheckpointDeclineException.java | 35 +++ ...ntDeclineOnCancellationBarrierException.java | 32 +++ .../CheckpointDeclineSubsumedException.java | 32 +++ ...intDeclineTaskNotCheckpointingException.java | 32 +++ .../CheckpointDeclineTaskNotReadyException.java | 32 +++ .../decline/InputEndOfStreamException.java | 32 +++ .../flink/runtime/execution/Environment.java | 9 + .../runtime/jobgraph/tasks/StatefulTask.java | 3 +- .../messages/checkpoint/DeclineCheckpoint.java | 65 +++--- .../runtime/taskmanager/RuntimeEnvironment.java | 7 + .../apache/flink/runtime/taskmanager/Task.java | 22 +- .../checkpoint/CheckpointCoordinatorTest.java | 41 +--- .../savepoint/SavepointCoordinatorTest.java | 2 +- .../jobmanager/JobManagerHARecoveryTest.java | 2 +- .../operators/testutils/DummyEnvironment.java | 5 + .../operators/testutils/MockEnvironment.java | 5 + .../runtime/taskmanager/TaskAsyncCallTest.java | 2 +- .../streaming/runtime/io/BarrierBuffer.java | 28 ++- .../streaming/runtime/io/BarrierTracker.java | 4 +- .../streaming/runtime/tasks/StreamTask.java | 25 ++- .../streaming/runtime/io/BarrierBufferTest.java | 31 +-- .../runtime/io/BarrierTrackerTest.java | 2 +- .../runtime/tasks/OneInputStreamTaskTest.java | 2 + .../runtime/tasks/StreamMockEnvironment.java | 3 + .../StreamTaskCancellationBarrierTest.java | 221 +++++++++++++++++++ .../runtime/tasks/StreamTaskTestHarness.java | 31 ++- .../runtime/tasks/TwoInputStreamTaskTest.java | 2 + 29 files changed, 632 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 409f05b..8661ddc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -575,6 +575,7 @@ public class CheckpointCoordinator { } final long checkpointId = message.getCheckpointId(); + final String reason = (message.getReason() != null ? message.getReason().getMessage() : ""); PendingCheckpoint checkpoint; @@ -594,8 +595,8 @@ public class CheckpointCoordinator { if (checkpoint != null && !checkpoint.isDiscarded()) { isPendingCheckpoint = true; - LOG.info("Discarding checkpoint " + checkpointId - + " because of checkpoint decline from task " + message.getTaskExecutionId()); + LOG.info("Discarding checkpoint {} because of checkpoint decline from task {} : {}", + checkpointId, message.getTaskExecutionId(), reason); pendingCheckpoints.remove(checkpointId); checkpoint.discard(userClassLoader); @@ -604,19 +605,14 @@ public class CheckpointCoordinator { onCancelCheckpoint(checkpointId); boolean haveMoreRecentPending = false; - Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator(); - while (entries.hasNext()) { - PendingCheckpoint p = entries.next().getValue(); - if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) { + for (PendingCheckpoint p : pendingCheckpoints.values()) { + if (!p.isDiscarded() && p.getCheckpointId() >= checkpoint.getCheckpointId()) { haveMoreRecentPending = true; break; } } - if (!haveMoreRecentPending && !triggerRequestQueued) { - LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId); - triggerCheckpoint(System.currentTimeMillis()); - } else if (!haveMoreRecentPending) { - LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId); + + if (!haveMoreRecentPending) { triggerQueuedRequests(); } } else if (checkpoint != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java new file mode 100644 index 0000000..64d57bc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.decline; + +/** + * Exception indicating that a checkpoint was declined because too many bytes were + * buffered in the alignment phase. + */ +public final class AlignmentLimitExceededException extends CheckpointDeclineException { + + private static final long serialVersionUID = 1L; + + public AlignmentLimitExceededException(long numBytes) { + super("The checkpoint alignment phase needed to buffer more than the configured maximum (" + + numBytes + " bytes)."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java new file mode 100644 index 0000000..8a2802c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.decline; + +/** + * Base class of all exceptions that indicate a declined checkpoint. + */ +public abstract class CheckpointDeclineException extends Exception { + + private static final long serialVersionUID = 1L; + + public CheckpointDeclineException(String message) { + super(message); + } + + public CheckpointDeclineException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java new file mode 100644 index 0000000..9ae4096 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.decline; + +/** + * Exception indicating that a checkpoint was declined because a cancellation + * barrier was received. + */ +public final class CheckpointDeclineOnCancellationBarrierException extends CheckpointDeclineException { + + private static final long serialVersionUID = 1L; + + public CheckpointDeclineOnCancellationBarrierException() { + super("Task received cancellation from one of its inputs"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java new file mode 100644 index 0000000..5380469 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.decline; + +/** + * Exception indicating that a checkpoint was declined because a newer checkpoint + * barrier was received on an input before the pending checkpoint's barrier. + */ +public final class CheckpointDeclineSubsumedException extends CheckpointDeclineException { + + private static final long serialVersionUID = 1L; + + public CheckpointDeclineSubsumedException(long newCheckpointId) { + super("Checkpoint was canceled because a barrier from newer checkpoint " + newCheckpointId + " was received."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java new file mode 100644 index 0000000..e5773d1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.decline; + +/** + * Exception indicating that a checkpoint was declined because a task does not support + * checkpointing. + */ +public final class CheckpointDeclineTaskNotCheckpointingException extends CheckpointDeclineException { + + private static final long serialVersionUID = 1L; + + public CheckpointDeclineTaskNotCheckpointingException(String taskName) { + super("Task '" + taskName + "'does not support checkpointing"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java new file mode 100644 index 0000000..a1214fe --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.decline; + +/** + * Exception indicating that a checkpoint was declined because a task was not + * ready to perform a checkpoint. + */ +public final class CheckpointDeclineTaskNotReadyException extends CheckpointDeclineException { + + private static final long serialVersionUID = 1L; + + public CheckpointDeclineTaskNotReadyException(String taskName) { + super("Task " + taskName + " was not running"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java new file mode 100644 index 0000000..86b29dc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.decline; + +/** + * Exception indicating that a checkpoint was declined because one of the input + * stream reached its end before the alignment was complete. + */ +public final class InputEndOfStreamException extends CheckpointDeclineException { + + private static final long serialVersionUID = 1L; + + public InputEndOfStreamException() { + super("Checkpoint was declined because one input stream is finished"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java index 5ad5fe2..e84a5e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java @@ -167,6 +167,15 @@ public interface Environment { void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state); /** + * Declines a checkpoint. This tells the checkpoint coordinator that this task will + * not be able to successfully complete a certain checkpoint. + * + * @param checkpointId The ID of the declined checkpoint. + * @param cause An optional reason why the checkpoint was declined. + */ + void declineCheckpoint(long checkpointId, Throwable cause); + + /** * Marks task execution failed for an external reason (a reason other than the task code itself * throwing an exception). If the task is already in a terminal state * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing. http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java index 7c581df..874ca4a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java @@ -66,8 +66,9 @@ public interface StatefulTask<T extends StateHandle<?>> { * {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their outputs. * * @param checkpointId The ID of the checkpoint to be aborted. + * @param cause The reason why the checkpoint was aborted during alignment */ - void abortCheckpointOnBarrier(long checkpointId) throws Exception; + void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception; /** * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java index f26d2fb..dca212c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java @@ -19,7 +19,14 @@ package org.apache.flink.runtime.messages.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException; +import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.util.SerializedThrowable; /** * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the @@ -31,44 +38,48 @@ public class DeclineCheckpoint extends AbstractCheckpointMessage implements java private static final long serialVersionUID = 2094094662279578953L; - /** The timestamp associated with the checkpoint */ - private final long timestamp; + /** The reason why the checkpoint was declined */ + private final Throwable reason; - public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) { - super(job, taskExecutionId, checkpointId); - this.timestamp = timestamp; + public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) { + this(job, taskExecutionId, checkpointId, null); } - // -------------------------------------------------------------------------------------------- + public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, Throwable reason) { + super(job, taskExecutionId, checkpointId); - public long getTimestamp() { - return timestamp; + if (reason == null || + reason.getClass() == AlignmentLimitExceededException.class || + reason.getClass() == CheckpointDeclineOnCancellationBarrierException.class || + reason.getClass() == CheckpointDeclineSubsumedException.class || + reason.getClass() == CheckpointDeclineTaskNotCheckpointingException.class || + reason.getClass() == CheckpointDeclineTaskNotReadyException.class || + reason.getClass() == InputEndOfStreamException.class) + { + // null or known common exceptions that cannot reference any dynamically loaded code + this.reason = reason; + } else { + // some other exception. replace with a serialized throwable, to be on the safe side + this.reason = new SerializedThrowable(reason); + } } // -------------------------------------------------------------------------------------------- - @Override - public int hashCode() { - return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32)); + /** + * Gets the reason why the checkpoint was declined. + * + * @return The reason why the checkpoint was declined + */ + public Throwable getReason() { + return reason; } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - else if (o instanceof DeclineCheckpoint) { - DeclineCheckpoint that = (DeclineCheckpoint) o; - return this.timestamp == that.timestamp && super.equals(o); - } - else { - return false; - } - } + // -------------------------------------------------------------------------------------------- @Override public String toString() { - return String.format("Declined Checkpoint %d@%d for (%s/%s)", - getCheckpointId(), getTimestamp(), getJob(), getTaskExecutionId()); + return String.format("Declined Checkpoint %d for (%s/%s): %s", + getCheckpointId(), getJob(), getTaskExecutionId(), reason); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java index 6fdf6f9..47149a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; @@ -268,6 +269,12 @@ public class RuntimeEnvironment implements Environment { } @Override + public void declineCheckpoint(long checkpointId, Throwable cause) { + DeclineCheckpoint message = new DeclineCheckpoint(jobId, executionId, checkpointId, cause); + jobManager.tell(message); + } + + @Override public void failExternally(Throwable cause) { this.containingTask.failExternally(cause); } http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index ed15dbf..3eab7e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -29,6 +29,8 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -969,13 +971,16 @@ public class Task implements Runnable { try { boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp); if (!success) { - DeclineCheckpoint decline = new DeclineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp); + // task was not ready to trigger this checkpoint + DeclineCheckpoint decline = new DeclineCheckpoint( + jobId, getExecutionId(), checkpointID, + new CheckpointDeclineTaskNotReadyException(taskName)); jobManager.tell(decline); } } catch (Throwable t) { if (getExecutionState() == ExecutionState.RUNNING) { - failExternally(new RuntimeException( + failExternally(new Exception( "Error while triggering checkpoint for " + taskName, t)); } @@ -987,10 +992,21 @@ public class Task implements Runnable { else { LOG.error("Task received a checkpoint request, but is not a checkpointing task - " + taskNameWithSubtask); + + DeclineCheckpoint decline = new DeclineCheckpoint( + jobId, executionId, checkpointID, + new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask)); + jobManager.tell(decline); } } else { - LOG.debug("Ignoring request to trigger a checkpoint for non-running task."); + LOG.debug("Declining checkpoint request for non-running task"); + + // send back a message that we did not do the checkpoint + DeclineCheckpoint decline = new DeclineCheckpoint( + jobId, executionId, checkpointID, + new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask)); + jobManager.tell(decline); } } http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 62af42b..f3f988a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -288,44 +288,19 @@ public class CheckpointCoordinatorTest { // decline checkpoint from the other task, this should cancel the checkpoint // and trigger a new one - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp())); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId)); assertTrue(checkpoint.isDiscarded()); - // validate that we have a new pending checkpoint - assertEquals(1, coord.getNumberOfPendingCheckpoints()); + // validate that we have no new pending checkpoint + assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); - long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); - PendingCheckpoint checkpointNew = coord.getPendingCheckpoints().get(checkpointIdNew); - - assertNotNull(checkpointNew); - assertEquals(checkpointIdNew, checkpointNew.getCheckpointId()); - assertEquals(jid, checkpointNew.getJobId()); - assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks()); - assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks()); - assertEquals(0, checkpointNew.getTaskStates().size()); - assertFalse(checkpointNew.isDiscarded()); - assertFalse(checkpointNew.isFullyAcknowledged()); - assertNotEquals(checkpoint.getCheckpointId(), checkpointNew.getCheckpointId()); - - // check that the vertices received the new trigger checkpoint message - { - TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointIdNew, checkpointNew.getCheckpointTimestamp()); - TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpointIdNew, checkpointNew.getCheckpointTimestamp()); - verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1)); - verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2)); - } - // decline again, nothing should happen // decline from the other task, nothing should happen - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp())); - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId, checkpoint.getCheckpointTimestamp())); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId)); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId)); assertTrue(checkpoint.isDiscarded()); - // should still have the same second checkpoint pending - long checkpointIdNew2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); - assertEquals(checkpointIdNew2, checkpointIdNew); - coord.shutdown(); } catch (Exception e) { @@ -422,7 +397,7 @@ public class CheckpointCoordinatorTest { // decline checkpoint from one of the tasks, this should cancel the checkpoint // and trigger a new one - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp())); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id)); assertTrue(checkpoint1.isDiscarded()); // validate that we have only one pending checkpoint left @@ -446,8 +421,8 @@ public class CheckpointCoordinatorTest { // decline again, nothing should happen // decline from the other task, nothing should happen - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp())); - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id, checkpoint1.getCheckpointTimestamp())); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id)); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id)); assertTrue(checkpoint1.isDiscarded()); coord.shutdown(); http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java index b1b384d..dc2b23f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java @@ -202,7 +202,7 @@ public class SavepointCoordinatorTest extends TestLogger { coordinator.receiveDeclineMessage(new DeclineCheckpoint( jobId, vertices[1].getCurrentExecutionAttempt().getAttemptId(), - checkpointId, 0)); + checkpointId)); // The pending checkpoint is completed http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 4dfaf95..2e3bace 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -455,7 +455,7 @@ public class JobManagerHARecoveryTest { } @Override - public void abortCheckpointOnBarrier(long checkpointId) { + public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) { throw new UnsupportedOperationException("should not be called!"); } http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java index 5af34fb..ca68683 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java @@ -140,6 +140,11 @@ public class DummyEnvironment implements Environment { public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {} @Override + public void declineCheckpoint(long checkpointId, Throwable cause) { + throw new UnsupportedOperationException(); + } + + @Override public void failExternally(Throwable cause) { throw new UnsupportedOperationException("DummyEnvironment does not support external task failure."); } http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 9dea324..22dee63 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -291,6 +291,11 @@ public class MockEnvironment implements Environment { } @Override + public void declineCheckpoint(long checkpointId, Throwable cause) { + throw new UnsupportedOperationException(); + } + + @Override public void failExternally(Throwable cause) { throw new UnsupportedOperationException("MockEnvironment does not support external task failure."); } http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 5b344eb..3ace0f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -226,7 +226,7 @@ public class TaskAsyncCallTest { } @Override - public void abortCheckpointOnBarrier(long checkpointId) { + public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) { throw new UnsupportedOperationException("Should not be called"); } http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index 36de717..7a8e7d3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -18,6 +18,10 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; +import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -142,7 +146,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } else { if (next.getEvent().getClass() == EndOfPartitionEvent.class) { - processEndOfPartition(next.getChannelIndex()); + processEndOfPartition(); } return next; } @@ -196,7 +200,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { "Skipping current checkpoint.", barrierId, currentCheckpointId); // let the task know we are not completing this - notifyAbort(currentCheckpointId); + notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId)); // abort the current checkpoint releaseBlocksAndResetBarriers(); @@ -241,7 +245,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { if (barrierId > currentCheckpointId) { // new checkpoint currentCheckpointId = barrierId; - notifyAbort(barrierId); + notifyAbortOnCancellationBarrier(barrierId); } return; } @@ -258,7 +262,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } releaseBlocksAndResetBarriers(); - notifyAbort(barrierId); + notifyAbortOnCancellationBarrier(barrierId); } else if (barrierId > currentCheckpointId) { // we canceled the next which also cancels the current @@ -272,7 +276,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { currentCheckpointId = barrierId; startOfAlignmentTimestamp = 0L; latestAlignmentDurationNanos = 0L; - notifyAbort(barrierId); + notifyAbortOnCancellationBarrier(barrierId); } // else: ignore trailing (cancellation) barrier from an earlier checkpoint (obsolete now) @@ -292,7 +296,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId); } - notifyAbort(barrierId); + notifyAbortOnCancellationBarrier(barrierId); } // else: trailing barrier from either @@ -300,12 +304,12 @@ public class BarrierBuffer implements CheckpointBarrierHandler { // - the current checkpoint if it was already canceled } - private void processEndOfPartition(int channel) throws Exception { + private void processEndOfPartition() throws Exception { numClosedChannels++; if (numBarriersReceived > 0) { // let the task know we skip a checkpoint - notifyAbort(currentCheckpointId); + notifyAbort(currentCheckpointId, new InputEndOfStreamException()); // no chance to complete this checkpoint releaseBlocksAndResetBarriers(); @@ -319,9 +323,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } } - private void notifyAbort(long checkpointId) throws Exception { + private void notifyAbortOnCancellationBarrier(long checkpointId) throws Exception { + notifyAbort(checkpointId, new CheckpointDeclineOnCancellationBarrierException()); + } + + private void notifyAbort(long checkpointId, CheckpointDeclineException cause) throws Exception { if (toNotifyOnCheckpoint != null) { - toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId); + toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause); } } http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java index 5157336..8b4cc48 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; @@ -227,7 +228,8 @@ public class BarrierTracker implements CheckpointBarrierHandler { private void notifyAbort(long checkpointId) throws Exception { if (toNotifyOnCheckpoint != null) { - toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId); + toNotifyOnCheckpoint.abortCheckpointOnBarrier( + checkpointId, new CheckpointDeclineOnCancellationBarrierException()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index d55a9c5..4f0839f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -25,6 +25,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.execution.CancelTaskException; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.state.AbstractStateBackend; @@ -592,13 +594,15 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> } @Override - public void abortCheckpointOnBarrier(long checkpointId) throws Exception { + public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception { LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", checkpointId, getName()); + // notify the coordinator that we decline this checkpoint + getEnvironment().declineCheckpoint(checkpointId, cause); + + // notify all downstream operators that they should not wait for a barrier from us synchronized (lock) { - if (isRunning) { - operatorChain.broadcastCheckpointCancelMarker(checkpointId); - } + operatorChain.broadcastCheckpointCancelMarker(checkpointId); } } @@ -669,7 +673,18 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> checkpointThread.start(); } return true; - } else { + } + else { + // we cannot perform our checkpoint - let the downstream operators know that they + // should not wait for any input from this operator + + // we cannot broadcast the cancellation markers on the 'operator chain', because it may not + // yet be created + final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointId); + for (ResultPartitionWriter output : getEnvironment().getAllWriters()) { + output.writeEventToAllChannels(message); + } + return false; } } http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java index cf1f98e..20cb8f7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java @@ -20,6 +20,8 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; @@ -45,6 +47,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; @@ -566,7 +569,7 @@ public class BarrierBufferTest { check(sequence[12], buffer.getNextNonBlocked()); assertEquals(3L, buffer.getCurrentCheckpointId()); validateAlignmentTime(startTs, buffer); - verify(toNotify).abortCheckpointOnBarrier(2L); + verify(toNotify).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineSubsumedException.class)); check(sequence[16], buffer.getNextNonBlocked()); // checkpoint 3 alignment in progress @@ -574,7 +577,7 @@ public class BarrierBufferTest { // checkpoint 3 aborted (end of partition) check(sequence[20], buffer.getNextNonBlocked()); - verify(toNotify).abortCheckpointOnBarrier(3L); + verify(toNotify).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class)); // replay buffered data from checkpoint 3 check(sequence[18], buffer.getNextNonBlocked()); @@ -999,13 +1002,13 @@ public class BarrierBufferTest { check(sequence[6], buffer.getNextNonBlocked()); assertEquals(5L, buffer.getCurrentCheckpointId()); verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(2L), anyLong()); - verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class)); verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), anyLong()); assertEquals(0L, buffer.getAlignmentDurationNanos()); check(sequence[8], buffer.getNextNonBlocked()); assertEquals(6L, buffer.getCurrentCheckpointId()); - verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); buffer.cleanup(); @@ -1073,7 +1076,7 @@ public class BarrierBufferTest { // canceled checkpoint on last barrier startTs = System.nanoTime(); check(sequence[12], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).abortCheckpointOnBarrier(2L); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class)); validateAlignmentTime(startTs, buffer); check(sequence[13], buffer.getNextNonBlocked()); @@ -1088,7 +1091,7 @@ public class BarrierBufferTest { // this checkpoint gets immediately canceled check(sequence[24], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); // some buffers @@ -1104,7 +1107,7 @@ public class BarrierBufferTest { check(sequence[33], buffer.getNextNonBlocked()); check(sequence[37], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); // all done @@ -1167,7 +1170,7 @@ public class BarrierBufferTest { // re-read the queued cancellation barriers check(sequence[9], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).abortCheckpointOnBarrier(2L); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); check(sequence[10], buffer.getNextNonBlocked()); @@ -1184,7 +1187,7 @@ public class BarrierBufferTest { // no further checkpoint (abort) notifications verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong()); - verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(CheckpointDeclineOnCancellationBarrierException.class)); // all done assertNull(buffer.getNextNonBlocked()); @@ -1253,7 +1256,7 @@ public class BarrierBufferTest { // cancelled by cancellation barrier check(sequence[4], buffer.getNextNonBlocked()); validateAlignmentTime(startTs, buffer); - verify(toNotify, times(1)).abortCheckpointOnBarrier(1L); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(1L), any(CheckpointDeclineOnCancellationBarrierException.class)); // the next checkpoint alignment starts now startTs = System.nanoTime(); @@ -1285,7 +1288,7 @@ public class BarrierBufferTest { // check overall notifications verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong()); - verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class)); } /** @@ -1337,7 +1340,7 @@ public class BarrierBufferTest { // future barrier aborts checkpoint startTs = System.nanoTime(); check(sequence[3], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).abortCheckpointOnBarrier(3L); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class)); check(sequence[4], buffer.getNextNonBlocked()); // alignment of next checkpoint @@ -1366,7 +1369,7 @@ public class BarrierBufferTest { // check overall notifications verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong()); - verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class)); } // ------------------------------------------------------------------------ @@ -1471,7 +1474,7 @@ public class BarrierBufferTest { } @Override - public void abortCheckpointOnBarrier(long checkpointId) {} + public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {} @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java index 903f585..978c212 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java @@ -481,7 +481,7 @@ public class BarrierTrackerTest { } @Override - public void abortCheckpointOnBarrier(long checkpointId) { + public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) { assertTrue("More checkpoints than expected", i < checkpointIDs.length); final long expectedId = checkpointIDs[i++]; http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 5fcc59e..9003f0e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -270,6 +271,7 @@ public class OneInputStreamTaskTest { testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0); testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1); + expectedOutput.add(new CancelCheckpointMarker(0)); expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime)); expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime)); expectedOutput.add(new CheckpointBarrier(1, 1)); http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index 7084208..36ad8ff 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -302,6 +302,9 @@ public class StreamMockEnvironment implements Environment { } @Override + public void declineCheckpoint(long checkpointId, Throwable cause) {} + + @Override public void failExternally(Throwable cause) { throw new UnsupportedOperationException("StreamMockEnvironment does not support external task failure."); } http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java new file mode 100644 index 0000000..8b8b659 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.streaming.api.functions.co.CoMapFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.api.operators.co.CoStreamMap; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ResultPartitionWriter.class}) +@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*"}) +public class StreamTaskCancellationBarrierTest { + + /** + * This test checks that tasks emit a proper cancel checkpoint barrier, if a "trigger checkpoint" message + * comes before they are ready. + */ + @Test + public void testEmitCancellationBarrierWhenNotReady() throws Exception { + StreamTask<String, ?> task = new InitBlockingTask(); + StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO); + + // start the test - this cannot succeed across the 'init()' method + testHarness.invoke(); + + // tell the task to commence a checkpoint + boolean result = task.triggerCheckpoint(41L, System.currentTimeMillis()); + assertFalse("task triggered checkpoint though not ready", result); + + // a cancellation barrier should be downstream + Object emitted = testHarness.getOutput().poll(); + assertNotNull("nothing emitted", emitted); + assertTrue("wrong type emitted", emitted instanceof CancelCheckpointMarker); + assertEquals("wrong checkpoint id", 41L, ((CancelCheckpointMarker) emitted).getCheckpointId()); + } + + /** + * This test verifies (for onw input tasks) that the Stream tasks react the following way to + * receiving a checkpoint cancellation barrier: + * + * - send a "decline checkpoint" notification out (to the JobManager) + * - emit a cancellation barrier downstream + */ + @Test + public void testDeclineCallOnCancelBarrierOneInput() throws Exception { + + OneInputStreamTask<String, String> task = new OneInputStreamTask<String, String>(); + OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>( + task, + 1, 2, + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + StreamConfig streamConfig = testHarness.getStreamConfig(); + StreamMap<String, String> mapOperator = new StreamMap<>(new IdentityMap()); + streamConfig.setStreamOperator(mapOperator); + + StreamMockEnvironment environment = spy(testHarness.createEnvironment()); + + // start the task + testHarness.invoke(environment); + testHarness.waitForTaskRunning(); + + // emit cancellation barriers + testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 1); + testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0); + testHarness.waitForInputProcessing(); + + // the decline call should go to the coordinator + verify(environment, times(1)).declineCheckpoint(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class)); + + // a cancellation barrier should be downstream + Object result = testHarness.getOutput().poll(); + assertNotNull("nothing emitted", result); + assertTrue("wrong type emitted", result instanceof CancelCheckpointMarker); + assertEquals("wrong checkpoint id", 2L, ((CancelCheckpointMarker) result).getCheckpointId()); + + // cancel and shutdown + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + } + + /** + * This test verifies (for onw input tasks) that the Stream tasks react the following way to + * receiving a checkpoint cancellation barrier: + * + * - send a "decline checkpoint" notification out (to the JobManager) + * - emit a cancellation barrier downstream + */ + @Test + public void testDeclineCallOnCancelBarrierTwoInputs() throws Exception { + + TwoInputStreamTask<String, String, String> task = new TwoInputStreamTask<String, String, String>(); + TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>( + task, + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + StreamConfig streamConfig = testHarness.getStreamConfig(); + CoStreamMap<String, String, String> op = new CoStreamMap<>(new UnionCoMap()); + streamConfig.setStreamOperator(op); + + StreamMockEnvironment environment = spy(testHarness.createEnvironment()); + + // start the task + testHarness.invoke(environment); + testHarness.waitForTaskRunning(); + + // emit cancellation barriers + testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0); + testHarness.processEvent(new CancelCheckpointMarker(2L), 1, 0); + testHarness.waitForInputProcessing(); + + // the decline call should go to the coordinator + verify(environment, times(1)).declineCheckpoint(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class)); + + // a cancellation barrier should be downstream + Object result = testHarness.getOutput().poll(); + assertNotNull("nothing emitted", result); + assertTrue("wrong type emitted", result instanceof CancelCheckpointMarker); + assertEquals("wrong checkpoint id", 2L, ((CancelCheckpointMarker) result).getCheckpointId()); + + // cancel and shutdown + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + } + + // ------------------------------------------------------------------------ + // test tasks / functions + // ------------------------------------------------------------------------ + + private static class InitBlockingTask extends StreamTask<String, AbstractStreamOperator<String>> { + + private final Object lock = new Object(); + private volatile boolean running = true; + + @Override + protected void init() throws Exception { + synchronized (lock) { + while (running) { + lock.wait(); + } + } + } + + @Override + protected void run() throws Exception {} + + @Override + protected void cleanup() throws Exception {} + + @Override + protected void cancelTask() throws Exception { + running = false; + synchronized (lock) { + lock.notifyAll(); + } + } + } + + private static class IdentityMap implements MapFunction<String, String> { + private static final long serialVersionUID = 1L; + + @Override + public String map(String value) throws Exception { + return value; + } + } + + private static class UnionCoMap implements CoMapFunction<String, String, String> { + private static final long serialVersionUID = 1L; + + @Override + public String map1(String value) throws Exception { + return value; + } + + @Override + public String map2(String value) throws Exception { + return value; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index 00e95b9..0bd8d9a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -150,21 +150,18 @@ public class StreamTaskTestHarness<OUT> { } + public StreamMockEnvironment createEnvironment() { + return new StreamMockEnvironment( + jobConfig, taskConfig, executionConfig, memorySize, new MockInputSplitProvider(), bufferSize); + } + /** * Invoke the Task. This resets the output of any previous invocation. This will start a new * Thread to execute the Task in. Use {@link #waitForTaskCompletion()} to wait for the * Task thread to finish running. */ public void invoke() throws Exception { - mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, executionConfig, - memorySize, new MockInputSplitProvider(), bufferSize); - task.setEnvironment(mockEnv); - - initializeInputs(); - initializeOutput(); - - taskThread = new TaskThread(task); - taskThread.start(); + invoke(createEnvironment()); } /** @@ -205,7 +202,7 @@ public class StreamTaskTestHarness<OUT> { if (taskThread.task instanceof StreamTask) { StreamTask<?, ?> streamTask = (StreamTask<?, ?>) taskThread.task; while (!streamTask.isRunning()) { - Thread.sleep(100); + Thread.sleep(10); if (!taskThread.isAlive()) { if (taskThread.getError() != null) { throw new Exception("Task Thread failed due to an error.", taskThread.getError()); @@ -282,15 +279,15 @@ public class StreamTaskTestHarness<OUT> { /** * This only returns after all input queues are empty. */ - public void waitForInputProcessing() { - - + public void waitForInputProcessing() throws Exception { // first wait for all input queues to be empty - try { - Thread.sleep(1); - } catch (InterruptedException ignored) {} - + while (true) { + Throwable error = taskThread.getError(); + if (error != null) { + throw new Exception("Exception in the task thread", error); + } + boolean allEmpty = true; for (int i = 0; i < numInputGates; i++) { if (!inputGates[i].allQueuesEmpty()) { http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index b9211b1..92f8553 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.streaming.api.functions.co.CoMapFunction; @@ -290,6 +291,7 @@ public class TwoInputStreamTaskTest { testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0); testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1); + expectedOutput.add(new CancelCheckpointMarker(0)); expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime)); expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime)); expectedOutput.add(new CheckpointBarrier(1, 1));
