[FLINK-4985] [checkpointing] Report canceled / declined checkpoints to the Checkpoint Coordinator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48a48139 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48a48139 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48a48139 Branch: refs/heads/master Commit: 48a48139172e86f548f3b2f1564bdc948c3fe76a Parents: 0a79dd5 Author: Stephan Ewen <[email protected]> Authored: Wed Nov 2 22:34:59 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Tue Nov 8 21:15:34 2016 +0100 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 77 +++---- .../decline/CheckpointDeclineException.java | 35 +++ ...ntDeclineOnCancellationBarrierException.java | 32 +++ .../CheckpointDeclineSubsumedException.java | 32 +++ ...intDeclineTaskNotCheckpointingException.java | 32 +++ .../CheckpointDeclineTaskNotReadyException.java | 32 +++ .../decline/InputEndOfStreamException.java | 32 +++ .../flink/runtime/execution/Environment.java | 13 +- .../runtime/jobgraph/tasks/StatefulTask.java | 3 +- .../messages/checkpoint/DeclineCheckpoint.java | 65 +++--- .../ActorGatewayCheckpointResponder.java | 12 +- .../taskmanager/CheckpointResponder.java | 6 +- .../runtime/taskmanager/RuntimeEnvironment.java | 5 + .../apache/flink/runtime/taskmanager/Task.java | 25 ++- .../flink/runtime/jobmanager/JobManager.scala | 5 +- .../checkpoint/CheckpointCoordinatorTest.java | 175 ++------------- .../jobmanager/JobManagerHARecoveryTest.java | 2 +- .../operators/testutils/DummyEnvironment.java | 5 + .../operators/testutils/MockEnvironment.java | 7 + .../runtime/taskmanager/TaskAsyncCallTest.java | 6 +- .../streaming/runtime/io/BarrierBuffer.java | 28 ++- .../streaming/runtime/io/BarrierTracker.java | 4 +- .../streaming/runtime/tasks/StreamTask.java | 33 ++- .../streaming/runtime/io/BarrierBufferTest.java | 33 +-- .../runtime/io/BarrierTrackerTest.java | 2 +- .../runtime/tasks/OneInputStreamTaskTest.java | 4 +- .../runtime/tasks/StreamMockEnvironment.java | 3 + .../StreamTaskCancellationBarrierTest.java | 214 +++++++++++++++++++ .../runtime/tasks/StreamTaskTestHarness.java | 31 ++- .../runtime/tasks/TwoInputStreamTaskTest.java | 4 +- .../EventTimeAllWindowCheckpointingITCase.java | 2 +- 31 files changed, 643 insertions(+), 316 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 26702c9..8088c3d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; @@ -278,10 +279,10 @@ public class CheckpointCoordinator { if (result.isSuccess()) { return result.getPendingCheckpoint().getCompletionFuture(); - } else { + } + else { Throwable cause = new Exception("Failed to trigger savepoint: " + result.getFailureReason().message()); - Future<CompletedCheckpoint> failed = FlinkCompletableFuture.completedExceptionally(cause); - return failed; + return FlinkCompletableFuture.completedExceptionally(cause); } } @@ -299,6 +300,7 @@ public class CheckpointCoordinator { return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, isPeriodic).isSuccess(); } + @VisibleForTesting CheckpointTriggerResult triggerCheckpoint( long timestamp, CheckpointProperties props, @@ -397,7 +399,7 @@ public class CheckpointCoordinator { // we lock with a special lock to make sure that trigger requests do not overtake each other. // this is not done with the coordinator-wide lock, because the 'checkpointIdCounter' - // may issue blocking operations. Using a different lock than teh coordinator-wide lock, + // may issue blocking operations. Using a different lock than the coordinator-wide lock, // we avoid blocking the processing of 'acknowledge/decline' messages during that time. synchronized (triggerLock) { final long checkpointID; @@ -525,81 +527,74 @@ public class CheckpointCoordinator { } /** - * Receives a {@link DeclineCheckpoint} message and returns whether the - * message was associated with a pending checkpoint. + * Receives a {@link DeclineCheckpoint} message for a pending checkpoint. * * @param message Checkpoint decline from the task manager - * - * @return Flag indicating whether the declined checkpoint was associated - * with a pending checkpoint. */ - public boolean receiveDeclineMessage(DeclineCheckpoint message) throws Exception { + public void receiveDeclineMessage(DeclineCheckpoint message) throws Exception { if (shutdown || message == null) { - return false; + return; } if (!job.equals(message.getJob())) { - LOG.error("Received DeclineCheckpoint message for wrong job: {}", message); - return false; + throw new IllegalArgumentException("Received DeclineCheckpoint message for job " + + message.getJob() + " while this coordinator handles job " + job); } final long checkpointId = message.getCheckpointId(); + final String reason = (message.getReason() != null ? message.getReason().getMessage() : ""); PendingCheckpoint checkpoint; - // Flag indicating whether the ack message was for a known pending - // checkpoint. - boolean isPendingCheckpoint; - synchronized (lock) { // we need to check inside the lock for being shutdown as well, otherwise we // get races and invalid error log messages if (shutdown) { - return false; + return; } checkpoint = pendingCheckpoints.get(checkpointId); if (checkpoint != null && !checkpoint.isDiscarded()) { - isPendingCheckpoint = true; - - LOG.info("Discarding checkpoint " + checkpointId - + " because of checkpoint decline from task " + message.getTaskExecutionId()); + LOG.info("Discarding checkpoint {} because of checkpoint decline from task {} : {}", + checkpointId, message.getTaskExecutionId(), reason); pendingCheckpoints.remove(checkpointId); checkpoint.abortDeclined(); rememberRecentCheckpointId(checkpointId); - boolean haveMoreRecentPending = false; + // we don't have to schedule another "dissolving" checkpoint any more because the + // cancellation barriers take care of breaking downstream alignments + // we only need to make sure that suspended queued requests are resumed + boolean haveMoreRecentPending = false; for (PendingCheckpoint p : pendingCheckpoints.values()) { - if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) { + if (!p.isDiscarded() && p.getCheckpointId() >= checkpoint.getCheckpointId()) { haveMoreRecentPending = true; break; } } - if (!haveMoreRecentPending && !triggerRequestQueued) { - LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId); - triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), checkpoint.getTargetDirectory(), checkpoint.isPeriodic()); - } else if (!haveMoreRecentPending) { - LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId); + + if (!haveMoreRecentPending) { triggerQueuedRequests(); } - } else if (checkpoint != null) { + } + else if (checkpoint != null) { // this should not happen throw new IllegalStateException( "Received message for discarded but non-removed checkpoint " + checkpointId); - } else { - // message is for an unknown checkpoint, or comes too late (checkpoint disposed) + } + else if (LOG.isDebugEnabled()) { if (recentPendingCheckpoints.contains(checkpointId)) { - isPendingCheckpoint = true; - LOG.info("Received another decline checkpoint message for now expired checkpoint attempt " + checkpointId); + // message is for an unknown checkpoint, or comes too late (checkpoint disposed) + LOG.debug("Received another decline message for now expired checkpoint attempt {} : {}", + checkpointId, reason); } else { - isPendingCheckpoint = false; + // message is for an unknown checkpoint. might be so old that we don't even remember it any more + LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} : {}", + checkpointId, reason); } } } - - return isPendingCheckpoint; } /** @@ -643,9 +638,7 @@ public class CheckpointCoordinator { if (checkpoint != null && !checkpoint.isDiscarded()) { isPendingCheckpoint = true; - if (checkpoint.acknowledgeTask( - message.getTaskExecutionId(), - message.getSubtaskState())) { + if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) { if (checkpoint.isFullyAcknowledged()) { completed = checkpoint.finalizeCheckpoint(); @@ -672,8 +665,8 @@ public class CheckpointCoordinator { } } else { // checkpoint did not accept message - LOG.error("Received duplicate or invalid acknowledge message for checkpoint " + checkpointId - + " , task " + message.getTaskExecutionId()); + LOG.error("Received duplicate or invalid acknowledge message for checkpoint {} , task {}", + checkpointId, message.getTaskExecutionId()); } } else if (checkpoint != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java new file mode 100644 index 0000000..8a2802c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.decline; + +/** + * Base class of all exceptions that indicate a declined checkpoint. + */ +public abstract class CheckpointDeclineException extends Exception { + + private static final long serialVersionUID = 1L; + + public CheckpointDeclineException(String message) { + super(message); + } + + public CheckpointDeclineException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java new file mode 100644 index 0000000..9ae4096 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.decline; + +/** + * Exception indicating that a checkpoint was declined because a cancellation + * barrier was received. + */ +public final class CheckpointDeclineOnCancellationBarrierException extends CheckpointDeclineException { + + private static final long serialVersionUID = 1L; + + public CheckpointDeclineOnCancellationBarrierException() { + super("Task received cancellation from one of its inputs"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java new file mode 100644 index 0000000..5380469 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.decline; + +/** + * Exception indicating that a checkpoint was declined because a newer checkpoint + * barrier was received on an input before the pending checkpoint's barrier. + */ +public final class CheckpointDeclineSubsumedException extends CheckpointDeclineException { + + private static final long serialVersionUID = 1L; + + public CheckpointDeclineSubsumedException(long newCheckpointId) { + super("Checkpoint was canceled because a barrier from newer checkpoint " + newCheckpointId + " was received."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java new file mode 100644 index 0000000..e5773d1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.decline; + +/** + * Exception indicating that a checkpoint was declined because a task does not support + * checkpointing. + */ +public final class CheckpointDeclineTaskNotCheckpointingException extends CheckpointDeclineException { + + private static final long serialVersionUID = 1L; + + public CheckpointDeclineTaskNotCheckpointingException(String taskName) { + super("Task '" + taskName + "'does not support checkpointing"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java new file mode 100644 index 0000000..a1214fe --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.decline; + +/** + * Exception indicating that a checkpoint was declined because a task was not + * ready to perform a checkpoint. + */ +public final class CheckpointDeclineTaskNotReadyException extends CheckpointDeclineException { + + private static final long serialVersionUID = 1L; + + public CheckpointDeclineTaskNotReadyException(String taskName) { + super("Task " + taskName + " was not running"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java new file mode 100644 index 0000000..86b29dc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.decline; + +/** + * Exception indicating that a checkpoint was declined because one of the input + * stream reached its end before the alignment was complete. + */ +public final class InputEndOfStreamException extends CheckpointDeclineException { + + private static final long serialVersionUID = 1L; + + public InputEndOfStreamException() { + super("Checkpoint was declined because one input stream is finished"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java index af1a640..8874eca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java @@ -174,9 +174,16 @@ public interface Environment { * @param checkpointMetaData the meta data for this checkpoint * @param subtaskState All state handles for the checkpointed state */ - void acknowledgeCheckpoint( - CheckpointMetaData checkpointMetaData, - SubtaskState subtaskState); + void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData, SubtaskState subtaskState); + + /** + * Declines a checkpoint. This tells the checkpoint coordinator that this task will + * not be able to successfully complete a certain checkpoint. + * + * @param checkpointId The ID of the declined checkpoint. + * @param cause An optional reason why the checkpoint was declined. + */ + void declineCheckpoint(long checkpointId, Throwable cause); /** * Marks task execution failed for an external reason (a reason other than the task code itself http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java index c91dcf2..39ddc961 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java @@ -68,8 +68,9 @@ public interface StatefulTask { * {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their outputs. * * @param checkpointId The ID of the checkpoint to be aborted. + * @param cause The reason why the checkpoint was aborted during alignment */ - void abortCheckpointOnBarrier(long checkpointId) throws Exception; + void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception; /** * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java index f26d2fb..830b751 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java @@ -19,7 +19,14 @@ package org.apache.flink.runtime.messages.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException; +import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.util.SerializedThrowable; /** * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the @@ -31,44 +38,48 @@ public class DeclineCheckpoint extends AbstractCheckpointMessage implements java private static final long serialVersionUID = 2094094662279578953L; - /** The timestamp associated with the checkpoint */ - private final long timestamp; + /** The reason why the checkpoint was declined */ + private final Throwable reason; - public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) { - super(job, taskExecutionId, checkpointId); - this.timestamp = timestamp; + public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) { + this(job, taskExecutionId, checkpointId, null); } - // -------------------------------------------------------------------------------------------- - - public long getTimestamp() { - return timestamp; + public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, Throwable reason) { + super(job, taskExecutionId, checkpointId); + + if (reason == null || + reason.getClass() == AlignmentLimitExceededException.class || + reason.getClass() == CheckpointDeclineOnCancellationBarrierException.class || + reason.getClass() == CheckpointDeclineSubsumedException.class || + reason.getClass() == CheckpointDeclineTaskNotCheckpointingException.class || + reason.getClass() == CheckpointDeclineTaskNotReadyException.class || + reason.getClass() == InputEndOfStreamException.class) + { + // null or known common exceptions that cannot reference any dynamically loaded code + this.reason = reason; + } else { + // some other exception. replace with a serialized throwable, to be on the safe side + this.reason = new SerializedThrowable(reason); + } } // -------------------------------------------------------------------------------------------- - @Override - public int hashCode() { - return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32)); + /** + * Gets the reason why the checkpoint was declined. + * + * @return The reason why the checkpoint was declined + */ + public Throwable getReason() { + return reason; } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - else if (o instanceof DeclineCheckpoint) { - DeclineCheckpoint that = (DeclineCheckpoint) o; - return this.timestamp == that.timestamp && super.equals(o); - } - else { - return false; - } - } + // -------------------------------------------------------------------------------------------- @Override public String toString() { - return String.format("Declined Checkpoint %d@%d for (%s/%s)", - getCheckpointId(), getTimestamp(), getJob(), getTaskExecutionId()); + return String.format("Declined Checkpoint %d for (%s/%s): %s", + getCheckpointId(), getJob(), getTaskExecutionId(), reason); } } http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java index 38defcc..dafcefe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java @@ -54,17 +54,17 @@ public class ActorGatewayCheckpointResponder implements CheckpointResponder { @Override public void declineCheckpoint( - JobID jobID, - ExecutionAttemptID executionAttemptID, - CheckpointMetaData checkpointMetaData) { + JobID jobID, + ExecutionAttemptID executionAttemptID, + long checkpointId, + Throwable reason) { DeclineCheckpoint decline = new DeclineCheckpoint( jobID, executionAttemptID, - checkpointMetaData.getCheckpointId(), - checkpointMetaData.getTimestamp()); + checkpointId, + reason); actorGateway.tell(decline); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java index 7dbb76c..cdf87d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java @@ -52,10 +52,12 @@ public interface CheckpointResponder { * * @param jobID Job ID of the running job * @param executionAttemptID Execution attempt ID of the running task - * @param checkpointMetaData Meta data for this checkpoint + * @param checkpointId The ID of the declined checkpoint + * @param cause The optional cause why the checkpoint was declined */ void declineCheckpoint( JobID jobID, ExecutionAttemptID executionAttemptID, - CheckpointMetaData checkpointMetaData); + long checkpointId, + Throwable cause); } http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java index fa69a60..7fe94a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java @@ -254,6 +254,11 @@ public class RuntimeEnvironment implements Environment { } @Override + public void declineCheckpoint(long checkpointId, Throwable cause) { + checkpointResponder.declineCheckpoint(jobId, executionId, checkpointId, cause); + } + + @Override public void failExternally(Throwable cause) { this.containingTask.failExternally(cause); } http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 313973f..ffcf909 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -29,6 +29,8 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -1008,15 +1010,13 @@ public class Task implements Runnable, TaskActions { * @param checkpointID The ID identifying the checkpoint. * @param checkpointTimestamp The timestamp associated with the checkpoint. */ - public void triggerCheckpointBarrier(long checkpointID, long checkpointTimestamp) { - - AbstractInvokable invokable = this.invokable; - + public void triggerCheckpointBarrier(final long checkpointID, long checkpointTimestamp) { + final AbstractInvokable invokable = this.invokable; final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp); if (executionState == ExecutionState.RUNNING && invokable != null) { - if (invokable instanceof StatefulTask) { + if (invokable instanceof StatefulTask) { // build a local closure final StatefulTask statefulTask = (StatefulTask) invokable; final String taskName = taskNameWithSubtask; @@ -1027,12 +1027,14 @@ public class Task implements Runnable, TaskActions { try { boolean success = statefulTask.triggerCheckpoint(checkpointMetaData); if (!success) { - checkpointResponder.declineCheckpoint(jobId, getExecutionId(), checkpointMetaData); + checkpointResponder.declineCheckpoint( + getJobID(), getExecutionId(), checkpointID, + new CheckpointDeclineTaskNotReadyException(taskName)); } } catch (Throwable t) { if (getExecutionState() == ExecutionState.RUNNING) { - failExternally(new RuntimeException( + failExternally(new Exception( "Error while triggering checkpoint for " + taskName, t)); } @@ -1042,12 +1044,19 @@ public class Task implements Runnable, TaskActions { executeAsyncCallRunnable(runnable, "Checkpoint Trigger for " + taskName); } else { + checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID, + new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask)); + LOG.error("Task received a checkpoint request, but is not a checkpointing task - " + taskNameWithSubtask); } } else { - LOG.debug("Ignoring request to trigger a checkpoint for non-running task."); + LOG.debug("Declining checkpoint request for non-running task"); + + // send back a message that we did not do the checkpoint + checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID, + new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 68e71ef..31f9dd7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1435,10 +1435,7 @@ class JobManager( if (checkpointCoordinator != null) { future { try { - if (!checkpointCoordinator.receiveDeclineMessage(declineMessage)) { - log.info("Received message for non-existing checkpoint " + - declineMessage.getCheckpointId) - } + checkpointCoordinator.receiveDeclineMessage(declineMessage) } catch { case t: Throwable => http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index b874612..5c50c02 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -36,8 +36,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; -import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; @@ -311,11 +309,8 @@ public class CheckpointCoordinatorTest { assertFalse(checkpoint.isFullyAcknowledged()); // check that the vertices received the trigger checkpoint message - { - TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpointId, timestamp); - verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp)); - verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp)); - } + verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp); + verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L); @@ -334,42 +329,19 @@ public class CheckpointCoordinatorTest { // decline checkpoint from the other task, this should cancel the checkpoint // and trigger a new one - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp())); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId)); assertTrue(checkpoint.isDiscarded()); - // validate that we have a new pending checkpoint - assertEquals(1, coord.getNumberOfPendingCheckpoints()); + // validate that we have no new pending checkpoint + assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); - long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); - PendingCheckpoint checkpointNew = coord.getPendingCheckpoints().get(checkpointIdNew); - - assertNotNull(checkpointNew); - assertEquals(checkpointIdNew, checkpointNew.getCheckpointId()); - assertEquals(jid, checkpointNew.getJobId()); - assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks()); - assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks()); - assertEquals(0, checkpointNew.getTaskStates().size()); - assertFalse(checkpointNew.isDiscarded()); - assertFalse(checkpointNew.isFullyAcknowledged()); - assertNotEquals(checkpoint.getCheckpointId(), checkpointNew.getCheckpointId()); - - // check that the vertices received the new trigger checkpoint message - { - verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(checkpointNew.getCheckpointTimestamp())); - verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(checkpointNew.getCheckpointTimestamp())); - } - // decline again, nothing should happen // decline from the other task, nothing should happen - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp())); - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId, checkpoint.getCheckpointTimestamp())); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId)); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId)); assertTrue(checkpoint.isDiscarded()); - // should still have the same second checkpoint pending - long checkpointIdNew2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); - assertEquals(checkpointIdNew2, checkpointIdNew); - coord.shutdown(JobStatus.FINISHED); } catch (Exception e) { @@ -464,7 +436,7 @@ public class CheckpointCoordinatorTest { // decline checkpoint from one of the tasks, this should cancel the checkpoint // and trigger a new one - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp())); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id)); assertTrue(checkpoint1.isDiscarded()); // validate that we have only one pending checkpoint left @@ -488,8 +460,8 @@ public class CheckpointCoordinatorTest { // decline again, nothing should happen // decline from the other task, nothing should happen - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp())); - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id, checkpoint1.getCheckpointTimestamp())); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id)); + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id)); assertTrue(checkpoint1.isDiscarded()); coord.shutdown(JobStatus.FINISHED); @@ -1349,8 +1321,6 @@ public class CheckpointCoordinatorTest { verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew)); verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew)); - NotifyCheckpointComplete confirmMessage1 = new NotifyCheckpointComplete(jid, attemptID1, checkpointIdNew, timestampNew); - NotifyCheckpointComplete confirmMessage2 = new NotifyCheckpointComplete(jid, attemptID2, checkpointIdNew, timestampNew); verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew)); verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew)); } @@ -1765,11 +1735,7 @@ public class CheckpointCoordinatorTest { assertFalse("Did not trigger savepoint", savepoint1.isDone()); } - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - -/** + /** * Tests that the checkpointed partitioned and non-partitioned state is assigned properly to * the {@link Execution} upon recovery. * @@ -2328,30 +2294,6 @@ public class CheckpointCoordinatorTest { // Utilities // ------------------------------------------------------------------------ - static void sendAckMessageToCoordinator( - CheckpointCoordinator coord, - long checkpointId, JobID jid, - ExecutionJobVertex jobVertex, - JobVertexID jobVertexID, - List<KeyGroupRange> keyGroupPartitions) throws Exception { - - for (int index = 0; index < jobVertex.getParallelism(); index++) { - ChainedStateHandle<StreamStateHandle> state = generateStateForVertex(jobVertexID, index); - KeyGroupsStateHandle keyGroupState = generateKeyGroupState( - jobVertexID, - keyGroupPartitions.get(index), false); - - SubtaskState checkpointStateHandles = new SubtaskState(state, null, null, keyGroupState, null, 0); - AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( - jid, - jobVertex.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - new CheckpointMetaData(checkpointId, 0L), - checkpointStateHandles); - - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); - } - } - public static KeyGroupsStateHandle generateKeyGroupState( JobVertexID jobVertexID, KeyGroupRange keyGroupPartition, boolean rawState) throws IOException { @@ -2467,7 +2409,7 @@ public class CheckpointCoordinatorTest { return generateChainedPartitionableStateHandle(statesListsMap); } - public static ChainedStateHandle<OperatorStateHandle> generateChainedPartitionableStateHandle( + private static ChainedStateHandle<OperatorStateHandle> generateChainedPartitionableStateHandle( Map<String, List<? extends Serializable>> states) throws IOException { List<List<? extends Serializable>> namedStateSerializables = new ArrayList<>(states.size()); @@ -2495,7 +2437,7 @@ public class CheckpointCoordinatorTest { return ChainedStateHandle.wrapSingleHandle(operatorStateHandle); } - public static ExecutionJobVertex mockExecutionJobVertex( + private static ExecutionJobVertex mockExecutionJobVertex( JobVertexID jobVertexID, int parallelism, int maxParallelism) { @@ -2847,95 +2789,4 @@ public class CheckpointCoordinatorTest { Assert.assertEquals(expectedTotalPartitions, actualTotalPartitions); Assert.assertEquals(expected, actual); } - - @Test - public void testDeclineCheckpointRespectsProperties() throws Exception { - final JobID jid = new JobID(); - final long timestamp = System.currentTimeMillis(); - - // create some mock Execution vertices that receive the checkpoint trigger messages - final ExecutionAttemptID attemptID1 = new ExecutionAttemptID(); - ExecutionVertex vertex1 = mockExecutionVertex(attemptID1); - - // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - ExternalizedCheckpointSettings.none(), - new ExecutionVertex[] { vertex1 }, - new ExecutionVertex[] { vertex1 }, - new ExecutionVertex[] { vertex1 }, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - null, - new DisabledCheckpointStatsTracker()); - - assertEquals(0, coord.getNumberOfPendingCheckpoints()); - assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); - - // trigger the first checkpoint. this should succeed - CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); - String targetDirectory = "xjasdkjakshdmmmxna"; - - CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(timestamp, props, targetDirectory, false); - assertTrue(triggerResult.isSuccess()); - - // validate that we have a pending checkpoint - assertEquals(1, coord.getNumberOfPendingCheckpoints()); - assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); - - long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); - PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId); - - assertNotNull(checkpoint); - assertEquals(checkpointId, checkpoint.getCheckpointId()); - assertEquals(timestamp, checkpoint.getCheckpointTimestamp()); - assertEquals(jid, checkpoint.getJobId()); - assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks()); - assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks()); - assertEquals(0, checkpoint.getTaskStates().size()); - assertFalse(checkpoint.isDiscarded()); - assertFalse(checkpoint.isFullyAcknowledged()); - assertEquals(props, checkpoint.getProps()); - assertEquals(targetDirectory, checkpoint.getTargetDirectory()); - - { - // check that the vertices received the trigger checkpoint message - verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp)); - } - - // decline checkpoint, this should cancel the checkpoint and re-trigger with correct properties - coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp())); - assertTrue(checkpoint.isDiscarded()); - - // validate that we have a new pending checkpoint - assertEquals(1, coord.getNumberOfPendingCheckpoints()); - assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); - - long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); - PendingCheckpoint checkpointNew = coord.getPendingCheckpoints().get(checkpointIdNew); - - assertNotNull(checkpointNew); - assertEquals(checkpointIdNew, checkpointNew.getCheckpointId()); - assertEquals(jid, checkpointNew.getJobId()); - assertEquals(1, checkpointNew.getNumberOfNonAcknowledgedTasks()); - assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks()); - assertEquals(0, checkpointNew.getTaskStates().size()); - assertFalse(checkpointNew.isDiscarded()); - assertFalse(checkpointNew.isFullyAcknowledged()); - assertNotEquals(checkpoint.getCheckpointId(), checkpointNew.getCheckpointId()); - // Respect the properties and target directory from the initial trigger - assertEquals(props, checkpointNew.getProps()); - assertEquals(targetDirectory, checkpointNew.getTargetDirectory()); - - // check that the vertices received the new trigger checkpoint message - { - verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(checkpointNew.getCheckpointTimestamp())); - } - - coord.shutdown(JobStatus.FINISHED); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index e40b2df..7d9c521 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -476,7 +476,7 @@ public class JobManagerHARecoveryTest { } @Override - public void abortCheckpointOnBarrier(long checkpointId) { + public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) { throw new UnsupportedOperationException("should not be called!"); } http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java index f2616b5..0b11730 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java @@ -159,6 +159,11 @@ public class DummyEnvironment implements Environment { } @Override + public void declineCheckpoint(long checkpointId, Throwable cause) { + throw new UnsupportedOperationException(); + } + + @Override public void failExternally(Throwable cause) { throw new UnsupportedOperationException("DummyEnvironment does not support external task failure."); } http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 08b84cb..646c038 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -313,10 +313,17 @@ public class MockEnvironment implements Environment { @Override public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData) { + throw new UnsupportedOperationException(); } @Override public void acknowledgeCheckpoint(CheckpointMetaData checkpointMetaData, SubtaskState subtaskState) { + throw new UnsupportedOperationException(); + } + + @Override + public void declineCheckpoint(long checkpointId, Throwable cause) { + throw new UnsupportedOperationException(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index ab29bb0..0a522af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -205,9 +205,7 @@ public class TaskAsyncCallTest { } @Override - public void setInitialState(TaskStateHandles taskStateHandles) throws Exception { - - } + public void setInitialState(TaskStateHandles taskStateHandles) throws Exception {} @Override public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) { @@ -232,7 +230,7 @@ public class TaskAsyncCallTest { } @Override - public void abortCheckpointOnBarrier(long checkpointId) { + public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) { throw new UnsupportedOperationException("Should not be called"); } http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index 3ccb575..66aaa44 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -18,7 +18,11 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -143,7 +147,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } else { if (next.getEvent().getClass() == EndOfPartitionEvent.class) { - processEndOfPartition(next.getChannelIndex()); + processEndOfPartition(); } return next; } @@ -197,7 +201,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { "Skipping current checkpoint.", barrierId, currentCheckpointId); // let the task know we are not completing this - notifyAbort(currentCheckpointId); + notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId)); // abort the current checkpoint releaseBlocksAndResetBarriers(); @@ -242,7 +246,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { if (barrierId > currentCheckpointId) { // new checkpoint currentCheckpointId = barrierId; - notifyAbort(barrierId); + notifyAbortOnCancellationBarrier(barrierId); } return; } @@ -259,7 +263,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } releaseBlocksAndResetBarriers(); - notifyAbort(barrierId); + notifyAbortOnCancellationBarrier(barrierId); } else if (barrierId > currentCheckpointId) { // we canceled the next which also cancels the current @@ -273,7 +277,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { currentCheckpointId = barrierId; startOfAlignmentTimestamp = 0L; latestAlignmentDurationNanos = 0L; - notifyAbort(barrierId); + notifyAbortOnCancellationBarrier(barrierId); } // else: ignore trailing (cancellation) barrier from an earlier checkpoint (obsolete now) @@ -293,7 +297,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId); } - notifyAbort(barrierId); + notifyAbortOnCancellationBarrier(barrierId); } // else: trailing barrier from either @@ -301,12 +305,12 @@ public class BarrierBuffer implements CheckpointBarrierHandler { // - the current checkpoint if it was already canceled } - private void processEndOfPartition(int channel) throws Exception { + private void processEndOfPartition() throws Exception { numClosedChannels++; if (numBarriersReceived > 0) { // let the task know we skip a checkpoint - notifyAbort(currentCheckpointId); + notifyAbort(currentCheckpointId, new InputEndOfStreamException()); // no chance to complete this checkpoint releaseBlocksAndResetBarriers(); @@ -326,9 +330,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } } - private void notifyAbort(long checkpointId) throws Exception { + private void notifyAbortOnCancellationBarrier(long checkpointId) throws Exception { + notifyAbort(checkpointId, new CheckpointDeclineOnCancellationBarrierException()); + } + + private void notifyAbort(long checkpointId, CheckpointDeclineException cause) throws Exception { if (toNotifyOnCheckpoint != null) { - toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId); + toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause); } } http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java index cce7cb4..72838bc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -234,7 +235,8 @@ public class BarrierTracker implements CheckpointBarrierHandler { private void notifyAbort(long checkpointId) throws Exception { if (toNotifyOnCheckpoint != null) { - toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId); + toNotifyOnCheckpoint.abortCheckpointOnBarrier( + checkpointId, new CheckpointDeclineOnCancellationBarrierException()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 1e03a96..bd34044 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -28,6 +28,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; @@ -540,22 +542,24 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } @Override - public void abortCheckpointOnBarrier(long checkpointId) throws Exception { + public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception { LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", checkpointId, getName()); + // notify the coordinator that we decline this checkpoint + getEnvironment().declineCheckpoint(checkpointId, cause); + + // notify all downstream operators that they should not wait for a barrier from us synchronized (lock) { - if (isRunning) { - operatorChain.broadcastCheckpointCancelMarker(checkpointId); - } + operatorChain.broadcastCheckpointCancelMarker(checkpointId); } } private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception { - LOG.debug("Starting checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), getName()); synchronized (lock) { if (isRunning) { + // we can do a checkpoint // Since both state checkpointing and downstream barrier emission occurs in this // lock scope, they are an atomic operation regardless of the order in which they occur. @@ -566,7 +570,18 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> checkpointState(checkpointMetaData); return true; - } else { + } + else { + // we cannot perform our checkpoint - let the downstream operators know that they + // should not wait for any input from this operator + + // we cannot broadcast the cancellation markers on the 'operator chain', because it may not + // yet be created + final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId()); + for (ResultPartitionWriter output : getEnvironment().getAllWriters()) { + output.writeEventToAllChannels(message); + } + return false; } } @@ -832,10 +847,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private final List<OperatorSnapshotResult> snapshotInProgressList; - RunnableFuture<KeyGroupsStateHandle> futureKeyedBackendStateHandles; - RunnableFuture<KeyGroupsStateHandle> futureKeyedStreamStateHandles; + private RunnableFuture<KeyGroupsStateHandle> futureKeyedBackendStateHandles; + private RunnableFuture<KeyGroupsStateHandle> futureKeyedStreamStateHandles; - List<StreamStateHandle> nonPartitionedStateHandles; + private List<StreamStateHandle> nonPartitionedStateHandles; private final CheckpointMetaData checkpointMetaData; http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java index 8754e10..446cc77 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java @@ -20,6 +20,8 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; @@ -52,6 +54,7 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -571,7 +574,7 @@ public class BarrierBufferTest { check(sequence[12], buffer.getNextNonBlocked()); assertEquals(3L, buffer.getCurrentCheckpointId()); validateAlignmentTime(startTs, buffer); - verify(toNotify).abortCheckpointOnBarrier(2L); + verify(toNotify).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineSubsumedException.class)); check(sequence[16], buffer.getNextNonBlocked()); // checkpoint 3 alignment in progress @@ -579,7 +582,7 @@ public class BarrierBufferTest { // checkpoint 3 aborted (end of partition) check(sequence[20], buffer.getNextNonBlocked()); - verify(toNotify).abortCheckpointOnBarrier(3L); + verify(toNotify).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class)); // replay buffered data from checkpoint 3 check(sequence[18], buffer.getNextNonBlocked()); @@ -1004,13 +1007,13 @@ public class BarrierBufferTest { check(sequence[6], buffer.getNextNonBlocked()); assertEquals(5L, buffer.getCurrentCheckpointId()); verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L))); - verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class)); verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L))); assertEquals(0L, buffer.getAlignmentDurationNanos()); check(sequence[8], buffer.getNextNonBlocked()); assertEquals(6L, buffer.getCurrentCheckpointId()); - verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); buffer.cleanup(); @@ -1078,7 +1081,7 @@ public class BarrierBufferTest { // canceled checkpoint on last barrier startTs = System.nanoTime(); check(sequence[12], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).abortCheckpointOnBarrier(2L); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class)); validateAlignmentTime(startTs, buffer); check(sequence[13], buffer.getNextNonBlocked()); @@ -1093,7 +1096,7 @@ public class BarrierBufferTest { // this checkpoint gets immediately canceled check(sequence[24], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); // some buffers @@ -1109,7 +1112,7 @@ public class BarrierBufferTest { check(sequence[33], buffer.getNextNonBlocked()); check(sequence[37], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); // all done @@ -1172,7 +1175,7 @@ public class BarrierBufferTest { // re-read the queued cancellation barriers check(sequence[9], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).abortCheckpointOnBarrier(2L); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); check(sequence[10], buffer.getNextNonBlocked()); @@ -1189,7 +1192,7 @@ public class BarrierBufferTest { // no further checkpoint (abort) notifications verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class)); - verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(CheckpointDeclineOnCancellationBarrierException.class)); // all done assertNull(buffer.getNextNonBlocked()); @@ -1258,7 +1261,7 @@ public class BarrierBufferTest { // cancelled by cancellation barrier check(sequence[4], buffer.getNextNonBlocked()); validateAlignmentTime(startTs, buffer); - verify(toNotify, times(1)).abortCheckpointOnBarrier(1L); + verify(toNotify).abortCheckpointOnBarrier(eq(1L), any(CheckpointDeclineOnCancellationBarrierException.class)); // the next checkpoint alignment starts now startTs = System.nanoTime(); @@ -1270,7 +1273,7 @@ public class BarrierBufferTest { // checkpoint done check(sequence[7], buffer.getNextNonBlocked()); validateAlignmentTime(startTs, buffer); - verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L))); + verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L))); // queued data check(sequence[10], buffer.getNextNonBlocked()); @@ -1290,7 +1293,7 @@ public class BarrierBufferTest { // check overall notifications verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class)); - verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class)); } /** @@ -1342,7 +1345,7 @@ public class BarrierBufferTest { // future barrier aborts checkpoint startTs = System.nanoTime(); check(sequence[3], buffer.getNextNonBlocked()); - verify(toNotify, times(1)).abortCheckpointOnBarrier(3L); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class)); check(sequence[4], buffer.getNextNonBlocked()); // alignment of next checkpoint @@ -1371,7 +1374,7 @@ public class BarrierBufferTest { // check overall notifications verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class)); - verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class)); } // ------------------------------------------------------------------------ @@ -1479,7 +1482,7 @@ public class BarrierBufferTest { } @Override - public void abortCheckpointOnBarrier(long checkpointId) {} + public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {} @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java index fa3363e..7ae144d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java @@ -482,7 +482,7 @@ public class BarrierTrackerTest { } @Override - public void abortCheckpointOnBarrier(long checkpointId) { + public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) { assertTrue("More checkpoints than expected", i < checkpointIDs.length); final long expectedId = checkpointIDs[i++]; http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 42d7cec..be93f6a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -32,6 +32,7 @@ import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.StateInitializationContext; @@ -305,6 +306,7 @@ public class OneInputStreamTaskTest extends TestLogger { testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0); testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1); + expectedOutput.add(new CancelCheckpointMarker(0)); expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime)); expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime)); expectedOutput.add(new CheckpointBarrier(1, 1)); http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index 2376a60..52daf6f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -322,6 +322,9 @@ public class StreamMockEnvironment implements Environment { } @Override + public void declineCheckpoint(long checkpointId, Throwable cause) {} + + @Override public void failExternally(Throwable cause) { this.wasFailedExternally = true; } http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java new file mode 100644 index 0000000..95828f8 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.streaming.api.functions.co.CoMapFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.api.operators.co.CoStreamMap; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class StreamTaskCancellationBarrierTest { + + /** + * This test checks that tasks emit a proper cancel checkpoint barrier, if a "trigger checkpoint" message + * comes before they are ready. + */ + @Test + public void testEmitCancellationBarrierWhenNotReady() throws Exception { + StreamTask<String, ?> task = new InitBlockingTask(); + StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO); + + // start the test - this cannot succeed across the 'init()' method + testHarness.invoke(); + + // tell the task to commence a checkpoint + boolean result = task.triggerCheckpoint(new CheckpointMetaData(41L, System.currentTimeMillis())); + assertFalse("task triggered checkpoint though not ready", result); + + // a cancellation barrier should be downstream + Object emitted = testHarness.getOutput().poll(); + assertNotNull("nothing emitted", emitted); + assertTrue("wrong type emitted", emitted instanceof CancelCheckpointMarker); + assertEquals("wrong checkpoint id", 41L, ((CancelCheckpointMarker) emitted).getCheckpointId()); + } + + /** + * This test verifies (for onw input tasks) that the Stream tasks react the following way to + * receiving a checkpoint cancellation barrier: + * + * - send a "decline checkpoint" notification out (to the JobManager) + * - emit a cancellation barrier downstream + */ + @Test + public void testDeclineCallOnCancelBarrierOneInput() throws Exception { + + OneInputStreamTask<String, String> task = new OneInputStreamTask<String, String>(); + OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>( + task, + 1, 2, + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + StreamConfig streamConfig = testHarness.getStreamConfig(); + StreamMap<String, String> mapOperator = new StreamMap<>(new IdentityMap()); + streamConfig.setStreamOperator(mapOperator); + + StreamMockEnvironment environment = spy(testHarness.createEnvironment()); + + // start the task + testHarness.invoke(environment); + testHarness.waitForTaskRunning(); + + // emit cancellation barriers + testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 1); + testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0); + testHarness.waitForInputProcessing(); + + // the decline call should go to the coordinator + verify(environment, times(1)).declineCheckpoint(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class)); + + // a cancellation barrier should be downstream + Object result = testHarness.getOutput().poll(); + assertNotNull("nothing emitted", result); + assertTrue("wrong type emitted", result instanceof CancelCheckpointMarker); + assertEquals("wrong checkpoint id", 2L, ((CancelCheckpointMarker) result).getCheckpointId()); + + // cancel and shutdown + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + } + + /** + * This test verifies (for onw input tasks) that the Stream tasks react the following way to + * receiving a checkpoint cancellation barrier: + * + * - send a "decline checkpoint" notification out (to the JobManager) + * - emit a cancellation barrier downstream + */ + @Test + public void testDeclineCallOnCancelBarrierTwoInputs() throws Exception { + + TwoInputStreamTask<String, String, String> task = new TwoInputStreamTask<String, String, String>(); + TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>( + task, + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + StreamConfig streamConfig = testHarness.getStreamConfig(); + CoStreamMap<String, String, String> op = new CoStreamMap<>(new UnionCoMap()); + streamConfig.setStreamOperator(op); + + StreamMockEnvironment environment = spy(testHarness.createEnvironment()); + + // start the task + testHarness.invoke(environment); + testHarness.waitForTaskRunning(); + + // emit cancellation barriers + testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0); + testHarness.processEvent(new CancelCheckpointMarker(2L), 1, 0); + testHarness.waitForInputProcessing(); + + // the decline call should go to the coordinator + verify(environment, times(1)).declineCheckpoint(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class)); + + // a cancellation barrier should be downstream + Object result = testHarness.getOutput().poll(); + assertNotNull("nothing emitted", result); + assertTrue("wrong type emitted", result instanceof CancelCheckpointMarker); + assertEquals("wrong checkpoint id", 2L, ((CancelCheckpointMarker) result).getCheckpointId()); + + // cancel and shutdown + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + } + + // ------------------------------------------------------------------------ + // test tasks / functions + // ------------------------------------------------------------------------ + + private static class InitBlockingTask extends StreamTask<String, AbstractStreamOperator<String>> { + + private final Object lock = new Object(); + private volatile boolean running = true; + + @Override + protected void init() throws Exception { + synchronized (lock) { + while (running) { + lock.wait(); + } + } + } + + @Override + protected void run() throws Exception {} + + @Override + protected void cleanup() throws Exception {} + + @Override + protected void cancelTask() throws Exception { + running = false; + synchronized (lock) { + lock.notifyAll(); + } + } + } + + private static class IdentityMap implements MapFunction<String, String> { + private static final long serialVersionUID = 1L; + + @Override + public String map(String value) throws Exception { + return value; + } + } + + private static class UnionCoMap implements CoMapFunction<String, String, String> { + private static final long serialVersionUID = 1L; + + @Override + public String map1(String value) throws Exception { + return value; + } + + @Override + public String map2(String value) throws Exception { + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/48a48139/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index c531c0d..b71e38d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; + import org.junit.Assert; import java.io.IOException; @@ -150,21 +151,18 @@ public class StreamTaskTestHarness<OUT> { } + public StreamMockEnvironment createEnvironment() { + return new StreamMockEnvironment( + jobConfig, taskConfig, executionConfig, memorySize, new MockInputSplitProvider(), bufferSize); + } + /** * Invoke the Task. This resets the output of any previous invocation. This will start a new * Thread to execute the Task in. Use {@link #waitForTaskCompletion()} to wait for the * Task thread to finish running. */ public void invoke() throws Exception { - mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, executionConfig, - memorySize, new MockInputSplitProvider(), bufferSize); - task.setEnvironment(mockEnv); - - initializeInputs(); - initializeOutput(); - - taskThread = new TaskThread(task); - taskThread.start(); + invoke(createEnvironment()); } /** @@ -237,7 +235,7 @@ public class StreamTaskTestHarness<OUT> { if (taskThread.task instanceof StreamTask) { StreamTask<?, ?> streamTask = (StreamTask<?, ?>) taskThread.task; while (!streamTask.isRunning()) { - Thread.sleep(100); + Thread.sleep(10); if (!taskThread.isAlive()) { if (taskThread.getError() != null) { throw new Exception("Task Thread failed due to an error.", taskThread.getError()); @@ -314,15 +312,14 @@ public class StreamTaskTestHarness<OUT> { /** * This only returns after all input queues are empty. */ - public void waitForInputProcessing() { - + public void waitForInputProcessing() throws Exception { - // first wait for all input queues to be empty - try { - Thread.sleep(1); - } catch (InterruptedException ignored) {} - while (true) { + Throwable error = taskThread.getError(); + if (error != null) { + throw new Exception("Exception in the task thread", error); + } + boolean allEmpty = true; for (int i = 0; i < numInputGates; i++) { if (!inputGates[i].allQueuesEmpty()) {
