This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c4b0e8f [FLINK-10724] Refactor failure handling in check point
coordinator
c4b0e8f is described below
commit c4b0e8f68c5c4bb2ba60b358df92ee5db1d857df
Author: vinoyang <[email protected]>
AuthorDate: Mon Apr 29 15:56:04 2019 +0800
[FLINK-10724] Refactor failure handling in check point coordinator
This closes #7571.
---
.../runtime/checkpoint/CheckpointCoordinator.java | 89 +++++++++++----------
.../runtime/checkpoint/CheckpointException.java | 30 +++++--
...ineReason.java => CheckpointFailureReason.java} | 28 ++++++-
.../checkpoint/CheckpointTriggerException.java | 42 ----------
.../checkpoint/CheckpointTriggerResult.java | 92 ----------------------
.../runtime/checkpoint/PendingCheckpoint.java | 51 +++---------
.../executiongraph/failover/FailoverRegion.java | 5 +-
.../apache/flink/runtime/jobmaster/JobMaster.java | 12 +--
.../checkpoint/CheckpointCoordinatorTest.java | 38 +++++----
.../runtime/checkpoint/PendingCheckpointTest.java | 30 +++----
.../jobmaster/JobMasterTriggerSavepointITCase.java | 4 +-
.../test/streaming/runtime/TimestampITCase.java | 4 +-
12 files changed, 160 insertions(+), 265 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e6cc5d3..c7f59a7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -338,7 +338,7 @@ public class CheckpointCoordinator {
// clear and discard all pending checkpoints
for (PendingCheckpoint pending :
pendingCheckpoints.values()) {
- pending.abortError(new
Exception("Checkpoint Coordinator is shutting down"));
+
pending.abort(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
}
pendingCheckpoints.clear();
@@ -405,17 +405,17 @@ public class CheckpointCoordinator {
checkNotNull(checkpointProperties);
- CheckpointTriggerResult triggerResult = triggerCheckpoint(
- timestamp,
- checkpointProperties,
- targetLocation,
- false,
- advanceToEndOfEventTime);
-
- if (triggerResult.isSuccess()) {
- return
triggerResult.getPendingCheckpoint().getCompletionFuture();
- } else {
- Throwable cause = new
CheckpointTriggerException("Failed to trigger savepoint.",
triggerResult.getFailureReason());
+ try {
+ PendingCheckpoint pendingCheckpoint = triggerCheckpoint(
+ timestamp,
+ checkpointProperties,
+ targetLocation,
+ false,
+ advanceToEndOfEventTime);
+
+ return pendingCheckpoint.getCompletionFuture();
+ } catch (CheckpointException e) {
+ Throwable cause = new CheckpointException("Failed to
trigger savepoint.", e.getCheckpointFailureReason());
return FutureUtils.completedExceptionally(cause);
}
}
@@ -431,16 +431,21 @@ public class CheckpointCoordinator {
* @return <code>true</code> if triggering the checkpoint succeeded.
*/
public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
- return triggerCheckpoint(timestamp, checkpointProperties, null,
isPeriodic, false).isSuccess();
+ try {
+ triggerCheckpoint(timestamp, checkpointProperties,
null, isPeriodic, false);
+ return true;
+ } catch (CheckpointException e) {
+ return false;
+ }
}
@VisibleForTesting
- public CheckpointTriggerResult triggerCheckpoint(
+ public PendingCheckpoint triggerCheckpoint(
long timestamp,
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic,
- boolean advanceToEndOfTime) {
+ boolean advanceToEndOfTime) throws CheckpointException {
if (advanceToEndOfTime && !(props.isSynchronous() &&
props.isSavepoint())) {
throw new IllegalArgumentException("Only synchronous
savepoints are allowed to advance the watermark to MAX.");
@@ -450,12 +455,12 @@ public class CheckpointCoordinator {
synchronized (lock) {
// abort if the coordinator has been shutdown in the
meantime
if (shutdown) {
- return new
CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
+ throw new
CheckpointException(CheckpointFailureReason.COORDINATOR_SHUTDOWN);
}
// Don't allow periodic checkpoint if scheduling has
been disabled
if (isPeriodic && !periodicScheduling) {
- return new
CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);
+ throw new
CheckpointException(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
}
// validate whether the checkpoint can be triggered,
with respect to the limit of
@@ -465,7 +470,7 @@ public class CheckpointCoordinator {
// sanity check: there should never be more
than one trigger request queued
if (triggerRequestQueued) {
LOG.warn("Trying to trigger another
checkpoint for job {} while one was queued already.", job);
- return new
CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
+ throw new
CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
}
// if too many checkpoints are currently in
progress, we need to mark that a request is queued
@@ -475,7 +480,7 @@ public class CheckpointCoordinator {
currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
- return new
CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
+ throw new
CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
}
// make sure the minimum interval between
checkpoints has passed
@@ -492,7 +497,7 @@ public class CheckpointCoordinator {
new ScheduledTrigger(),
durationTillNextMillis,
baseInterval, TimeUnit.MILLISECONDS);
- return new
CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
+ throw new
CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
}
}
}
@@ -506,7 +511,7 @@ public class CheckpointCoordinator {
LOG.info("Checkpoint triggering task {} of job
{} is not being executed at the moment. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job);
- return new
CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+ throw new
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
} else if (ee.getState() == ExecutionState.RUNNING) {
executions[i] = ee;
} else {
@@ -515,7 +520,7 @@ public class CheckpointCoordinator {
job,
ExecutionState.RUNNING,
ee.getState());
- return new
CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+ throw new
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
@@ -531,7 +536,7 @@ public class CheckpointCoordinator {
LOG.info("Checkpoint acknowledging task {} of
job {} is not being executed at the moment. Aborting checkpoint.",
ev.getTaskNameWithSubtaskIndex(),
job);
- return new
CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+ throw new
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
@@ -561,7 +566,7 @@ public class CheckpointCoordinator {
job,
numUnsuccessful,
t);
- return new
CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
+ throw new
CheckpointException(CheckpointFailureReason.EXCEPTION, t);
}
final PendingCheckpoint checkpoint = new
PendingCheckpoint(
@@ -590,7 +595,7 @@ public class CheckpointCoordinator {
if (!checkpoint.isDiscarded()) {
LOG.info("Checkpoint {} of job
{} expired before completing.", checkpointID, job);
- checkpoint.abortExpired();
+
checkpoint.abort(CheckpointFailureReason.CHECKPOINT_EXPIRED);
pendingCheckpoints.remove(checkpointID);
rememberRecentCheckpointId(checkpointID);
@@ -605,12 +610,12 @@ public class CheckpointCoordinator {
// since we released the lock in the
meantime, we need to re-check
// that the conditions still hold.
if (shutdown) {
- return new
CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
+ throw new
CheckpointException(CheckpointFailureReason.COORDINATOR_SHUTDOWN);
}
else if (!props.forceCheckpoint()) {
if (triggerRequestQueued) {
LOG.warn("Trying to
trigger another checkpoint for job {} while one was queued already.", job);
- return new
CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
+ throw new
CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
}
if (pendingCheckpoints.size()
>= maxConcurrentCheckpointAttempts) {
@@ -619,7 +624,7 @@ public class CheckpointCoordinator {
currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
- return new
CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
+ throw new
CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
}
// make sure the minimum
interval between checkpoints has passed
@@ -637,7 +642,7 @@ public class CheckpointCoordinator {
new
ScheduledTrigger(),
durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
- return new
CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
+ throw new
CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
}
}
@@ -677,7 +682,7 @@ public class CheckpointCoordinator {
}
numUnsuccessfulCheckpointsTriggers.set(0);
- return new CheckpointTriggerResult(checkpoint);
+ return checkpoint;
}
catch (Throwable t) {
// guard the map against concurrent
modifications
@@ -690,7 +695,7 @@ public class CheckpointCoordinator {
checkpointID, job,
numUnsuccessful, t);
if (!checkpoint.isDiscarded()) {
- checkpoint.abortError(new
Exception("Failed to trigger checkpoint", t));
+
checkpoint.abort(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t);
}
try {
@@ -700,7 +705,7 @@ public class CheckpointCoordinator {
LOG.warn("Cannot dispose failed
checkpoint storage location {}", checkpointStorageLocation, t2);
}
- return new
CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
+ throw new
CheckpointException(CheckpointFailureReason.EXCEPTION, t);
}
} // end trigger lock
@@ -879,10 +884,11 @@ public class CheckpointCoordinator {
catch (Exception e1) {
// abort the current pending checkpoint if we
fails to finalize the pending checkpoint.
if (!pendingCheckpoint.isDiscarded()) {
- pendingCheckpoint.abortError(e1);
+
pendingCheckpoint.abort(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE,
e1);
}
- throw new CheckpointException("Could not
finalize the pending checkpoint " + checkpointId + '.', e1);
+ throw new CheckpointException("Could not
finalize the pending checkpoint " + checkpointId + '.',
+
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
}
// the pending checkpoint must be discarded after the
finalization
@@ -903,7 +909,8 @@ public class CheckpointCoordinator {
}
});
- throw new CheckpointException("Could not
complete the pending checkpoint " + checkpointId + '.', exception);
+ throw new CheckpointException("Could not
complete the pending checkpoint " + checkpointId + '.',
+
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);
}
} finally {
pendingCheckpoints.remove(checkpointId);
@@ -984,7 +991,7 @@ public class CheckpointCoordinator {
// remove all pending checkpoints that are lesser than
the current completed checkpoint
if (p.getCheckpointId() < checkpointId &&
p.canBeSubsumed()) {
rememberRecentCheckpointId(p.getCheckpointId());
- p.abortSubsumed();
+
p.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED);
entries.remove();
}
}
@@ -1244,7 +1251,7 @@ public class CheckpointCoordinator {
currentPeriodicTrigger = null;
}
- abortPendingCheckpoints(new Exception("Checkpoint
Coordinator is suspending."));
+ abortPendingCheckpoints(new
CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SUSPEND));
numUnsuccessfulCheckpointsTriggers.set(0);
}
@@ -1254,10 +1261,10 @@ public class CheckpointCoordinator {
* Aborts all the pending checkpoints due to en exception.
* @param exception The exception.
*/
- public void abortPendingCheckpoints(Exception exception) {
+ public void abortPendingCheckpoints(CheckpointException exception) {
synchronized (lock) {
for (PendingCheckpoint p : pendingCheckpoints.values())
{
- p.abortError(exception);
+ p.abort(exception.getCheckpointFailureReason());
}
pendingCheckpoints.clear();
@@ -1312,9 +1319,9 @@ public class CheckpointCoordinator {
LOG.info("Discarding checkpoint {} of job {}.", checkpointId,
job, cause);
if (cause == null || cause instanceof
CheckpointDeclineException) {
- pendingCheckpoint.abortDeclined();
+
pendingCheckpoint.abort(CheckpointFailureReason.CHECKPOINT_DECLINED, cause);
} else {
- pendingCheckpoint.abortError(cause);
+
pendingCheckpoint.abort(CheckpointFailureReason.JOB_FAILURE, cause);
}
rememberRecentCheckpointId(checkpointId);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
index 707878c..c0bc2d1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
@@ -18,18 +18,38 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.util.Preconditions;
+
/**
* Base class for checkpoint related exceptions.
*/
public class CheckpointException extends Exception {
- private static final long serialVersionUID = -4341865597039002540L;
+ private static final long serialVersionUID = 3257526119022486948L;
+
+ private final CheckpointFailureReason checkpointFailureReason;
+
+ public CheckpointException(CheckpointFailureReason failureReason) {
+ super(failureReason.message());
+ this.checkpointFailureReason =
Preconditions.checkNotNull(failureReason);
+ }
+
+ public CheckpointException(String message, CheckpointFailureReason
failureReason) {
+ super(message + " Failure reason: " + failureReason.message());
+ this.checkpointFailureReason =
Preconditions.checkNotNull(failureReason);
+ }
+
+ public CheckpointException(CheckpointFailureReason failureReason,
Throwable cause) {
+ super(failureReason.message(), cause);
+ this.checkpointFailureReason =
Preconditions.checkNotNull(failureReason);
+ }
- public CheckpointException(String message, Throwable cause) {
- super(message, cause);
+ public CheckpointException(String message, CheckpointFailureReason
failureReason, Throwable cause) {
+ super(message + " Failure reason: " + failureReason.message(),
cause);
+ this.checkpointFailureReason =
Preconditions.checkNotNull(failureReason);
}
- public CheckpointException(String message) {
- super(message);
+ public CheckpointFailureReason getCheckpointFailureReason() {
+ return checkpointFailureReason;
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
similarity index 69%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
index 41c50cc0..35f457a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
@@ -19,9 +19,9 @@
package org.apache.flink.runtime.checkpoint;
/**
- * Various reasons why a checkpoint was declined.
+ * Various reasons why a checkpoint was failure.
*/
-public enum CheckpointDeclineReason {
+public enum CheckpointFailureReason {
COORDINATOR_SHUTDOWN("Checkpoint coordinator is shut down."),
@@ -38,13 +38,33 @@ public enum CheckpointDeclineReason {
EXCEPTION("An Exception occurred while triggering the checkpoint."),
- EXPIRED("The checkpoint expired before triggering was complete");
+ EXPIRED("The checkpoint expired before triggering was complete"),
+
+ CHECKPOINT_EXPIRED("Checkpoint expired before completing."),
+
+ CHECKPOINT_SUBSUMED("Checkpoint has been subsumed."),
+
+ CHECKPOINT_DECLINED("Checkpoint was declined (tasks not ready)."),
+
+ CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
+
+ CHECKPOINT_COORDINATOR_SUSPEND("Checkpoint Coordinator is suspending."),
+
+ JOB_FAILURE("The job has failed."),
+
+ JOB_FAILOVER_REGION("FailoverRegion is restarting."),
+
+ TASK_CHECKPOINT_FAILURE("Task local checkpoint failure."),
+
+ FINALIZE_CHECKPOINT_FAILURE("Failure to finalize checkpoint."),
+
+ TRIGGER_CHECKPOINT_FAILURE("Trigger checkpoint failure.");
//
------------------------------------------------------------------------
private final String message;
- CheckpointDeclineReason(String message) {
+ CheckpointFailureReason(String message) {
this.message = message;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java
deleted file mode 100644
index cb0402a..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Exceptions which indicate that a checkpoint triggering has failed.
- *
- */
-public class CheckpointTriggerException extends FlinkException {
-
- private static final long serialVersionUID = -3330160816161901752L;
-
- private final CheckpointDeclineReason checkpointDeclineReason;
-
- public CheckpointTriggerException(String message,
CheckpointDeclineReason checkpointDeclineReason) {
- super(message + " Decline reason: " +
checkpointDeclineReason.message());
- this.checkpointDeclineReason =
Preconditions.checkNotNull(checkpointDeclineReason);
- }
-
- public CheckpointDeclineReason getCheckpointDeclineReason() {
- return checkpointDeclineReason;
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
deleted file mode 100644
index 8689f72..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The result of triggering a checkpoint. May either be a declined checkpoint
- * trigger attempt, or a pending checkpoint.
- */
-public class CheckpointTriggerResult {
-
- /** If success, the pending checkpoint created after the successfully
trigger, otherwise null */
- private final PendingCheckpoint success;
-
- /** If failure, the reason why the triggering was declined, otherwise
null. */
- private final CheckpointDeclineReason failure;
-
- //
------------------------------------------------------------------------
-
- /**
- * Creates a successful checkpoint trigger result.
- *
- * @param success The pending checkpoint created after the successfully
trigger.
- */
- CheckpointTriggerResult(PendingCheckpoint success) {
- this.success = checkNotNull(success);
- this.failure = null;
- }
-
- /**
- * Creates a failed checkpoint trigger result.
- *
- * @param failure The reason why the checkpoint could not be triggered.
- */
- CheckpointTriggerResult(CheckpointDeclineReason failure) {
- this.success = null;
- this.failure = checkNotNull(failure);
- }
-
- //
------------------------------------------------------------------------
-
- public boolean isSuccess() {
- return success != null;
- }
-
- public boolean isFailure() {
- return failure != null;
- }
-
- public PendingCheckpoint getPendingCheckpoint() {
- if (success != null) {
- return success;
- } else {
- throw new IllegalStateException("Checkpoint triggering
failed");
- }
- }
-
- public CheckpointDeclineReason getFailureReason() {
- if (failure != null) {
- return failure;
- } else {
- throw new IllegalStateException("Checkpoint triggering
was successful");
- }
- }
-
- //
------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return "CheckpointTriggerResult(" +
- (isSuccess() ?
- ("success: " + success) :
- ("failure: " +
failure.message())) + ")";
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 1bc6b0e..d03c28f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -34,7 +34,6 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -405,54 +404,30 @@ public class PendingCheckpoint {
//
------------------------------------------------------------------------
/**
- * Aborts a checkpoint because it expired (took too long).
+ * Aborts a checkpoint with reason and cause.
*/
- public void abortExpired() {
+ public void abort(CheckpointFailureReason reason, Throwable cause) {
try {
- Exception cause = new Exception("Checkpoint expired
before completing");
- onCompletionPromise.completeExceptionally(cause);
- reportFailedCheckpoint(cause);
+ CheckpointException exception = new
CheckpointException(reason, cause);
+ onCompletionPromise.completeExceptionally(exception);
+ reportFailedCheckpoint(exception);
+ assertAbortSubsumedForced(reason);
} finally {
dispose(true);
}
}
/**
- * Aborts the pending checkpoint because a newer completed checkpoint
subsumed it.
+ * Aborts a checkpoint with reason and cause.
*/
- public void abortSubsumed() {
- try {
- Exception cause = new Exception("Checkpoints has been
subsumed");
- onCompletionPromise.completeExceptionally(cause);
- reportFailedCheckpoint(cause);
-
- if (props.forceCheckpoint()) {
- throw new IllegalStateException("Bug: forced
checkpoints must never be subsumed");
- }
- } finally {
- dispose(true);
- }
- }
-
-
- public void abortDeclined() {
- abortWithCause(new Exception("Checkpoint was declined (tasks
not ready)"));
- }
-
- /**
- * Aborts the pending checkpoint due to an error.
- * @param cause The error's exception.
- */
- public void abortError(@Nonnull Throwable cause) {
- abortWithCause(new Exception("Checkpoint failed: " +
cause.getMessage(), cause));
+ public void abort(CheckpointFailureReason reason) {
+ abort(reason, null);
}
- private void abortWithCause(@Nonnull Exception cause) {
- try {
- onCompletionPromise.completeExceptionally(cause);
- reportFailedCheckpoint(cause);
- } finally {
- dispose(true);
+ private void assertAbortSubsumedForced(CheckpointFailureReason reason) {
+ if (props.forceCheckpoint() && reason ==
CheckpointFailureReason.CHECKPOINT_SUBSUMED) {
+ throw new IllegalStateException("Bug: forced
checkpoints must never be subsumed, " +
+ "the abort reason is : " + reason.message());
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
index 635a7f5..4cc2dc6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.executiongraph.failover;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.Execution;
@@ -210,7 +212,8 @@ public class FailoverRegion {
// we restart the checkpoint scheduler
for
// i) enable new checkpoint could be
triggered without waiting for last checkpoint expired.
// ii) ensure the EXACTLY_ONCE
semantics if needed.
-
executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(new
Exception("FailoverRegion is restarting."));
+
executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
+ new
CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
tasks, false, true);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1bbfb62..f912f45 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -31,9 +31,9 @@ import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
@@ -1508,13 +1508,13 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId> implements JobMast
(String savepointPath, Throwable throwable) -> {
if (throwable != null) {
final Throwable
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
- if (strippedThrowable
instanceof CheckpointTriggerException) {
- final
CheckpointTriggerException checkpointTriggerException =
(CheckpointTriggerException) strippedThrowable;
+ if (strippedThrowable
instanceof CheckpointException) {
+ final
CheckpointException checkpointException = (CheckpointException)
strippedThrowable;
- if
(checkpointTriggerException.getCheckpointDeclineReason() ==
CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) {
+ if
(checkpointException.getCheckpointFailureReason() ==
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING) {
return
lastInternalSavepoint;
} else {
- throw new
CompletionException(checkpointTriggerException);
+ throw new
CompletionException(checkpointException);
}
} else {
throw new
CompletionException(strippedThrowable);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 437b084..e1f8f9c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -3139,25 +3139,29 @@ public class CheckpointCoordinatorTest extends
TestLogger {
SharedStateRegistry.DEFAULT_FACTORY);
// Periodic
- CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(
- System.currentTimeMillis(),
-
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
- null,
- true,
- false);
-
- assertTrue(triggerResult.isFailure());
-
assertEquals(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN,
triggerResult.getFailureReason());
+ try {
+ coord.triggerCheckpoint(
+ System.currentTimeMillis(),
+
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ null,
+ true,
+ false);
+ fail("The triggerCheckpoint call expected an
exception");
+ } catch (CheckpointException e) {
+
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN,
e.getCheckpointFailureReason());
+ }
// Not periodic
- triggerResult = coord.triggerCheckpoint(
- System.currentTimeMillis(),
-
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
- null,
- false,
- false);
-
- assertFalse(triggerResult.isFailure());
+ try {
+ coord.triggerCheckpoint(
+ System.currentTimeMillis(),
+
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ null,
+ false,
+ false);
+ } catch (CheckpointException e) {
+ fail("Unexpected exception : " +
e.getCheckpointFailureReason().message());
+ }
}
private void testCreateKeyGroupPartitions(int maxParallelism, int
parallelism) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 7554d4a..6cce28f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -93,7 +93,7 @@ public class PendingCheckpointTest {
assertFalse(pending.canBeSubsumed());
try {
- pending.abortSubsumed();
+
pending.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED);
fail("Did not throw expected Exception");
} catch (IllegalStateException ignored) {
// Expected
@@ -113,7 +113,7 @@ public class PendingCheckpointTest {
assertFalse(pending.canBeSubsumed());
try {
- pending.abortSubsumed();
+
pending.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED);
fail("Did not throw expected Exception");
} catch (IllegalStateException ignored) {
// Expected
@@ -133,7 +133,7 @@ public class PendingCheckpointTest {
CompletableFuture<CompletedCheckpoint> future =
pending.getCompletionFuture();
assertFalse(future.isDone());
- pending.abortDeclined();
+ pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED);
assertTrue(future.isDone());
// Abort expired
@@ -141,7 +141,7 @@ public class PendingCheckpointTest {
future = pending.getCompletionFuture();
assertFalse(future.isDone());
- pending.abortExpired();
+ pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED);
assertTrue(future.isDone());
// Abort subsumed
@@ -149,7 +149,7 @@ public class PendingCheckpointTest {
future = pending.getCompletionFuture();
assertFalse(future.isDone());
- pending.abortSubsumed();
+ pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED);
assertTrue(future.isDone());
// Finalize (all ACK'd)
@@ -191,7 +191,7 @@ public class PendingCheckpointTest {
PendingCheckpoint pending = createPendingCheckpoint(props,
executor);
setTaskState(pending, state);
- pending.abortDeclined();
+ pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED);
// execute asynchronous discard operation
executor.runQueuedCommands();
verify(state, times(1)).discardState();
@@ -202,7 +202,7 @@ public class PendingCheckpointTest {
pending = createPendingCheckpoint(props, executor);
setTaskState(pending, state);
- pending.abortError(new Exception("Expected Test Exception"));
+ pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED, new
Exception("Expected Test Exception"));
// execute asynchronous discard operation
executor.runQueuedCommands();
verify(state, times(1)).discardState();
@@ -213,7 +213,7 @@ public class PendingCheckpointTest {
pending = createPendingCheckpoint(props, executor);
setTaskState(pending, state);
- pending.abortExpired();
+ pending.abort(CheckpointFailureReason.CHECKPOINT_EXPIRED);
// execute asynchronous discard operation
executor.runQueuedCommands();
verify(state, times(1)).discardState();
@@ -224,7 +224,7 @@ public class PendingCheckpointTest {
pending = createPendingCheckpoint(props, executor);
setTaskState(pending, state);
- pending.abortSubsumed();
+ pending.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED);
// execute asynchronous discard operation
executor.runQueuedCommands();
verify(state, times(1)).discardState();
@@ -256,7 +256,7 @@ public class PendingCheckpointTest {
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
pending.setStatsCallback(callback);
- pending.abortSubsumed();
+
pending.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED);
verify(callback,
times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
}
@@ -267,7 +267,7 @@ public class PendingCheckpointTest {
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
pending.setStatsCallback(callback);
- pending.abortDeclined();
+
pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED);
verify(callback,
times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
}
@@ -278,7 +278,7 @@ public class PendingCheckpointTest {
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
pending.setStatsCallback(callback);
- pending.abortError(new Exception("Expected test
error"));
+
pending.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED, new
Exception("Expected test error"));
verify(callback,
times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
}
@@ -289,7 +289,7 @@ public class PendingCheckpointTest {
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
pending.setStatsCallback(callback);
- pending.abortExpired();
+
pending.abort(CheckpointFailureReason.CHECKPOINT_EXPIRED);
verify(callback,
times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
}
}
@@ -327,7 +327,7 @@ public class PendingCheckpointTest {
final CheckpointProperties props = new
CheckpointProperties(false, CheckpointType.CHECKPOINT, true, true, true, true,
true);
PendingCheckpoint aborted = createPendingCheckpoint(props);
- aborted.abortDeclined();
+ aborted.abort(CheckpointFailureReason.CHECKPOINT_DECLINED);
assertTrue(aborted.isDiscarded());
assertFalse(aborted.setCancellerHandle(mock(ScheduledFuture.class)));
@@ -335,7 +335,7 @@ public class PendingCheckpointTest {
ScheduledFuture<?> canceller = mock(ScheduledFuture.class);
assertTrue(pending.setCancellerHandle(canceller));
- pending.abortDeclined();
+ pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED);
verify(canceller).cancel(false);
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
index b781e41..75e9ed1 100644
---
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
@@ -20,11 +20,11 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
-import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
@@ -160,7 +160,7 @@ public class JobMasterTriggerSavepointITCase extends
AbstractTestBase {
try {
cancelWithSavepoint();
} catch (Exception e) {
- assertThat(ExceptionUtils.findThrowable(e,
CheckpointTriggerException.class).isPresent(), equalTo(true));
+ assertThat(ExceptionUtils.findThrowable(e,
CheckpointException.class).isPresent(), equalTo(true));
}
final JobStatus jobStatus =
clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS);
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 2665d09..2a1b824 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -27,7 +27,7 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -198,7 +198,7 @@ public class TimestampITCase extends TestLogger {
}
catch (Exception e) {
if (
-
!(e.getCause() instanceof CheckpointTriggerException) ||
+
!(e.getCause() instanceof CheckpointException) ||
!e.getCause().getMessage().contains("Not all required tasks are currently
running.")
) {
throw e;