This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b760d55 [FLINK-11662] Disable task to fail on checkpoint errors
b760d55 is described below
commit b760d556ca3757f3d0d1b8c81e5182b1bcc3dba3
Author: Yun Tang <[email protected]>
AuthorDate: Thu Jun 27 19:40:59 2019 +0800
[FLINK-11662] Disable task to fail on checkpoint errors
This closes #8745.
---
.../apache/flink/api/common/ExecutionConfig.java | 19 ++--
.../tests/DataStreamAllroundTestJobFactory.java | 18 ++--
.../checkpoint/CheckpointFailureManager.java | 2 +-
.../tasks/CheckpointCoordinatorConfiguration.java | 4 +-
.../api/environment/CheckpointConfig.java | 67 +++++++++++--
.../api/graph/StreamingJobGraphGenerator.java | 7 +-
.../api/operators/AbstractStreamOperator.java | 4 +-
.../tasks/CheckpointExceptionHandlerFactory.java | 26 +----
.../flink/streaming/runtime/tasks/StreamTask.java | 53 ++--------
...heckpointExceptionHandlerConfigurationTest.java | 107 ++++++---------------
.../tasks/CheckpointExceptionHandlerTest.java | 21 +---
.../streaming/runtime/tasks/StreamTaskTest.java | 32 ++----
.../tasks/TaskCheckpointingBehaviourTest.java | 38 +-------
13 files changed, 143 insertions(+), 255 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index b6475d5..fd3b358 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -154,7 +154,10 @@ public class ExecutionConfig implements Serializable,
Archiveable<ArchivedExecut
/** This flag defines if we use compression for the state snapshot data
or not. Default: false */
private boolean useSnapshotCompression = false;
- /** Determines if a task fails or not if there is an error in writing
its checkpoint data. Default: true */
+ /**
+ * @deprecated Should no longer be used because we would not support to
let task directly fail on checkpoint error.
+ */
+ @Deprecated
private boolean failTaskOnCheckpointError = true;
/** The default input dependency constraint to schedule tasks. */
@@ -948,20 +951,22 @@ public class ExecutionConfig implements Serializable,
Archiveable<ArchivedExecut
}
/**
- * This method is visible because of the way the configuration is
currently forwarded from the checkpoint config to
- * the task. This should not be called by the user, please use
CheckpointConfig.isFailTaskOnCheckpointError()
- * instead.
+ * @deprecated This method takes no effect since we would not forward
the configuration from the checkpoint config
+ * to the task, and we have not supported task to fail on checkpoint
error.
+ * Please use CheckpointConfig.getTolerableCheckpointFailureNumber() to
know the behavior on checkpoint errors.
*/
+ @Deprecated
@Internal
public boolean isFailTaskOnCheckpointError() {
return failTaskOnCheckpointError;
}
/**
- * This method is visible because of the way the configuration is
currently forwarded from the checkpoint config to
- * the task. This should not be called by the user, please use
CheckpointConfig.setFailOnCheckpointingErrors(...)
- * instead.
+ * @deprecated This method takes no effect since we would not forward
the configuration from the checkpoint config
+ * to the task, and we have not supported task to fail on checkpoint
error.
+ * Please use CheckpointConfig.setTolerableCheckpointFailureNumber(int)
to determine the behavior on checkpoint errors.
*/
+ @Deprecated
@Internal
public void setFailTaskOnCheckpointError(boolean
failTaskOnCheckpointError) {
this.failTaskOnCheckpointError = failTaskOnCheckpointError;
diff --git
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 913d030..31dcfb8 100644
---
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -81,7 +81,7 @@ import static
org.apache.flink.streaming.tests.TestOperatorEnum.RESULT_TYPE_QUER
* <li>environment.checkpoint_interval (long, default - 1000): the
checkpoint interval.</li>
* <li>environment.externalize_checkpoint (boolean, default - false):
whether or not checkpoints should be externalized.</li>
* <li>environment.externalize_checkpoint.cleanup (String, default -
'retain'): Configures the cleanup mode for externalized checkpoints. Can be
'retain' or 'delete'.</li>
- * <li>environment.fail_on_checkpointing_errors (String, default - true):
Sets the expected behaviour for tasks in case that they encounter an error in
their checkpointing procedure.</li>
+ * <li>environment.tolerable_checkpoint_failure_number (int, default - 0):
Sets the expected behaviour for the job manager in case that it received
declined checkpoints from tasks.</li>
* <li>environment.parallelism (int, default - 1): parallelism to use for
the job.</li>
* <li>environment.max_parallelism (int, default - 128): max parallelism
to use for the job</li>
* <li>environment.restart_strategy (String, default - 'fixed_delay'): The
failure restart strategy to use. Can be 'fixed_delay' or 'no_restart'.</li>
@@ -150,9 +150,9 @@ public class DataStreamAllroundTestJobFactory {
.key("environment.externalize_checkpoint.cleanup")
.defaultValue("retain");
- private static final ConfigOption<Boolean>
ENVIRONMENT_FAIL_ON_CHECKPOINTING_ERRORS = ConfigOptions
- .key("environment.fail_on_checkpointing_errors")
- .defaultValue(true);
+ private static final ConfigOption<Integer>
ENVIRONMENT_TOLERABLE_DECLINED_CHECKPOINT_NUMBER = ConfigOptions
+ .key("environment.tolerable_declined_checkpoint_number ")
+ .defaultValue(0);
private static final ConfigOption<Integer> ENVIRONMENT_PARALLELISM =
ConfigOptions
.key("environment.parallelism")
@@ -272,12 +272,12 @@ public class DataStreamAllroundTestJobFactory {
throw new
IllegalArgumentException("Unknown clean up mode for externalized checkpoints: "
+ cleanupModeConfig);
}
env.getCheckpointConfig().enableExternalizedCheckpoints(cleanupMode);
- }
- final boolean failOnCheckpointingErrors = pt.getBoolean(
- ENVIRONMENT_FAIL_ON_CHECKPOINTING_ERRORS.key(),
-
ENVIRONMENT_FAIL_ON_CHECKPOINTING_ERRORS.defaultValue());
-
env.getCheckpointConfig().setFailOnCheckpointingErrors(failOnCheckpointingErrors);
+ final int tolerableDeclinedCheckpointNumber = pt.getInt(
+
ENVIRONMENT_TOLERABLE_DECLINED_CHECKPOINT_NUMBER.key(),
+
ENVIRONMENT_TOLERABLE_DECLINED_CHECKPOINT_NUMBER.defaultValue());
+
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(tolerableDeclinedCheckpointNumber);
+ }
}
private static void setupParallelism(final StreamExecutionEnvironment
env, final ParameterTool pt) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 4a95cdd..568e836 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -31,7 +31,7 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
*/
public class CheckpointFailureManager {
- private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER =
Integer.MAX_VALUE;
+ public static final int UNLIMITED_TOLERABLE_FAILURE_NUMBER =
Integer.MAX_VALUE;
private final int tolerableCpFailureNumber;
private final FailJobCallback failureCallback;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
index cff5777..74fcdf3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
@@ -65,7 +65,7 @@ public class CheckpointCoordinatorConfiguration implements
Serializable {
int maxConcurrentCheckpoints,
CheckpointRetentionPolicy checkpointRetentionPolicy,
boolean isExactlyOnce,
- boolean isPerfetCheckpointForRecovery,
+ boolean isPreferCheckpointForRecovery,
int tolerableCpFailureNumber) {
// sanity checks
@@ -81,7 +81,7 @@ public class CheckpointCoordinatorConfiguration implements
Serializable {
this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
this.checkpointRetentionPolicy =
Preconditions.checkNotNull(checkpointRetentionPolicy);
this.isExactlyOnce = isExactlyOnce;
- this.isPreferCheckpointForRecovery =
isPerfetCheckpointForRecovery;
+ this.isPreferCheckpointForRecovery =
isPreferCheckpointForRecovery;
this.tolerableCheckpointFailureNumber =
tolerableCpFailureNumber;
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index c2c3536..033f55a 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -23,7 +23,11 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.streaming.api.CheckpointingMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import static java.util.Objects.requireNonNull;
+import static
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.UNLIMITED_TOLERABLE_FAILURE_NUMBER;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -34,6 +38,8 @@ public class CheckpointConfig implements java.io.Serializable
{
private static final long serialVersionUID = -750378776078908147L;
+ private static final Logger LOG =
LoggerFactory.getLogger(CheckpointConfig.class);
+
/** The default checkpoint mode: exactly once. */
public static final CheckpointingMode DEFAULT_MODE =
CheckpointingMode.EXACTLY_ONCE;
@@ -46,6 +52,8 @@ public class CheckpointConfig implements java.io.Serializable
{
/** The default limit of concurrently happening checkpoints: one. */
public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
+ public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
+
//
------------------------------------------------------------------------
/** Checkpointing mode (exactly-once vs. at-least-once). */
@@ -69,14 +77,24 @@ public class CheckpointConfig implements
java.io.Serializable {
/** Cleanup behaviour for persistent checkpoints. */
private ExternalizedCheckpointCleanup externalizedCheckpointCleanup;
- /** Determines if a tasks are failed or not if there is an error in
their checkpointing. Default: true */
+ /**
+ * Task would not fail if there is an error in their checkpointing.
+ *
+ * <p>{@link #tolerableCheckpointFailureNumber} would always overrule
this deprecated field if they have conflicts.
+ *
+ * @deprecated Use {@link #tolerableCheckpointFailureNumber}.
+ */
+ @Deprecated
private boolean failOnCheckpointingErrors = true;
/** Determines if a job will fallback to checkpoint when there is a
more recent savepoint. **/
private boolean preferCheckpointForRecovery = false;
- /** Determines the threshold that we tolerance checkpoint failure
number. */
- private int tolerableCheckpointFailureNumber = 0;
+ /**
+ * Determines the threshold that we tolerance declined checkpoint
failure number.
+ * The default value is -1 meaning undetermined and not set via {@link
#setTolerableCheckpointFailureNumber(int)}.
+ * */
+ private int tolerableCheckpointFailureNumber =
UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER;
//
------------------------------------------------------------------------
@@ -239,27 +257,57 @@ public class CheckpointConfig implements
java.io.Serializable {
}
/**
- * This determines the behaviour of tasks if there is an error in their
local checkpointing. If this returns true,
- * tasks will fail as a reaction. If this returns false, task will only
decline the failed checkpoint.
+ * This determines the behaviour when meeting checkpoint errors.
+ * If this returns true, which is equivalent to get
tolerableCheckpointFailureNumber as zero, job manager would
+ * fail the whole job once it received a decline checkpoint message.
+ * If this returns false, which is equivalent to get
tolerableCheckpointFailureNumber as the maximum of integer (means unlimited),
+ * job manager would not fail the whole job no matter how many declined
checkpoints it received.
+ *
+ * @deprecated Use {@link #getTolerableCheckpointFailureNumber()}.
*/
+ @Deprecated
public boolean isFailOnCheckpointingErrors() {
return failOnCheckpointingErrors;
}
/**
- * Sets the expected behaviour for tasks in case that they encounter an
error in their checkpointing procedure.
- * If this is set to true, the task will fail on checkpointing error.
If this is set to false, the task will only
- * decline a the checkpoint and continue running. The default is true.
+ * Sets the expected behaviour for tasks in case that they encounter an
error when checkpointing.
+ * If this is set as true, which is equivalent to set
tolerableCheckpointFailureNumber as zero, job manager would
+ * fail the whole job once it received a decline checkpoint message.
+ * If this is set as false, which is equivalent to set
tolerableCheckpointFailureNumber as the maximum of integer (means unlimited),
+ * job manager would not fail the whole job no matter how many declined
checkpoints it received.
+ *
+ * <p>{@link #setTolerableCheckpointFailureNumber(int)} would always
overrule this deprecated method if they have conflicts.
+ *
+ * @deprecated Use {@link #setTolerableCheckpointFailureNumber(int)}.
*/
+ @Deprecated
public void setFailOnCheckpointingErrors(boolean
failOnCheckpointingErrors) {
+ if (tolerableCheckpointFailureNumber !=
UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER) {
+ LOG.warn("Since tolerableCheckpointFailureNumber has
been configured as {}, deprecated #setFailOnCheckpointingErrors(boolean) " +
+ "method would not take any effect and please
use #setTolerableCheckpointFailureNumber(int) method to " +
+ "determine your expected behaviour when
checkpoint errors on task side.", tolerableCheckpointFailureNumber);
+ return;
+ }
this.failOnCheckpointingErrors = failOnCheckpointingErrors;
+ if (failOnCheckpointingErrors) {
+ this.tolerableCheckpointFailureNumber = 0;
+ } else {
+ this.tolerableCheckpointFailureNumber =
UNLIMITED_TOLERABLE_FAILURE_NUMBER;
+ }
}
/**
* Get the tolerable checkpoint failure number which used by the
checkpoint failure manager
* to determine when we need to fail the job.
+ *
+ * <p>If the {@link #tolerableCheckpointFailureNumber} has not been
configured, this method would return 0
+ * which means the checkpoint failure manager would not tolerate any
declined checkpoint failure.
*/
public int getTolerableCheckpointFailureNumber() {
+ if (tolerableCheckpointFailureNumber ==
UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER) {
+ return 0;
+ }
return tolerableCheckpointFailureNumber;
}
@@ -268,6 +316,9 @@ public class CheckpointConfig implements
java.io.Serializable {
* we do not tolerance any checkpoint failure.
*/
public void setTolerableCheckpointFailureNumber(int
tolerableCheckpointFailureNumber) {
+ if (tolerableCheckpointFailureNumber < 0) {
+ throw new IllegalArgumentException("The tolerable
failure checkpoint number must be non-negative.");
+ }
this.tolerableCheckpointFailureNumber =
tolerableCheckpointFailureNumber;
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index e191dea..4c11fa3 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.graph;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.operators.ResourceSpec;
@@ -606,11 +605,7 @@ public class StreamingJobGraphGenerator {
CheckpointConfig cfg = streamGraph.getCheckpointConfig();
long interval = cfg.getCheckpointInterval();
- if (interval >= 10) {
- ExecutionConfig executionConfig =
streamGraph.getExecutionConfig();
- // propagate the expected behaviour for checkpoint
errors to task.
-
executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
- } else {
+ if (interval < 10) {
// interval of max value means disable periodic
checkpoint
interval = Long.MAX_VALUE;
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 09d8bca..7d2eda5 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -32,6 +32,8 @@ import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -419,7 +421,7 @@ public abstract class AbstractStreamOperator<OUT>
if (!getContainingTask().isCanceled()) {
LOG.info(snapshotFailMessage,
snapshotException);
}
- throw new Exception(snapshotFailMessage,
snapshotException);
+ throw new CheckpointException(snapshotFailMessage,
CheckpointFailureReason.CHECKPOINT_DECLINED, snapshotException);
}
return snapshotInProgress;
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
index 430f43e..64ab71c 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerFactory.java
@@ -29,32 +29,12 @@ import org.apache.flink.util.Preconditions;
public class CheckpointExceptionHandlerFactory {
/**
- * Returns a {@link CheckpointExceptionHandler} that either causes a
task to fail completely or to just declines
- * checkpoint on exception, depending on the parameter flag.
+ * Returns a {@link CheckpointExceptionHandler} that just declines
checkpoint on exception.
*/
public CheckpointExceptionHandler createCheckpointExceptionHandler(
- boolean failTaskOnCheckpointException,
Environment environment) {
- if (failTaskOnCheckpointException) {
- return new FailingCheckpointExceptionHandler();
- } else {
- return new
DecliningCheckpointExceptionHandler(environment);
- }
- }
-
- /**
- * This handler makes the task fail by rethrowing a reported exception.
- */
- static final class FailingCheckpointExceptionHandler implements
CheckpointExceptionHandler {
-
- @Override
- public void tryHandleCheckpointException(
- CheckpointMetaData checkpointMetaData,
- Exception exception) throws Exception {
-
- throw exception;
- }
+ return new DecliningCheckpointExceptionHandler(environment);
}
/**
@@ -71,7 +51,7 @@ public class CheckpointExceptionHandlerFactory {
@Override
public void tryHandleCheckpointException(
CheckpointMetaData checkpointMetaData,
- Exception exception) throws Exception {
+ Exception exception) {
environment.declineCheckpoint(checkpointMetaData.getCheckpointId(), exception);
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 3927e46..e5f56b2 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -183,10 +183,7 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
private ExecutorService asyncOperationsThreadPool;
/** Handler for exceptions during checkpointing in the stream task.
Used in synchronous part of the checkpoint. */
- private CheckpointExceptionHandler
synchronousCheckpointExceptionHandler;
-
- /** Wrapper for synchronousCheckpointExceptionHandler to deal with
rethrown exceptions. Used in the async part. */
- private AsyncCheckpointExceptionHandler
asynchronousCheckpointExceptionHandler;
+ private CheckpointExceptionHandler checkpointExceptionHandler;
private final
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters;
@@ -323,11 +320,8 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
CheckpointExceptionHandlerFactory
cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();
- synchronousCheckpointExceptionHandler =
cpExceptionHandlerFactory.createCheckpointExceptionHandler(
-
getExecutionConfig().isFailTaskOnCheckpointError(),
- getEnvironment());
-
- asynchronousCheckpointExceptionHandler = new
AsyncCheckpointExceptionHandler(this);
+ checkpointExceptionHandler = cpExceptionHandlerFactory
+
.createCheckpointExceptionHandler(getEnvironment());
stateBackend = createStateBackend();
checkpointStorage =
stateBackend.createCheckpointStorage(getEnvironment().getJobID());
@@ -1062,9 +1056,12 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
// We only report the exception for the
original cause of fail and cleanup.
// Otherwise this followup exception
could race the original exception in failing the task.
-
owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
- checkpointMetaData,
- checkpointException);
+ try {
+
owner.checkpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData,
checkpointException);
+ } catch (Exception unhandled) {
+ AsynchronousException
asyncException = new AsynchronousException(unhandled);
+
owner.handleAsyncException("Failure in asynchronous checkpoint
materialization", asyncException);
+ }
currentState =
CheckpointingOperation.AsyncCheckpointState.DISCARDED;
} else {
@@ -1227,7 +1224,7 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
// operation, and without the failure,
the task would go back to normal execution.
throw ex;
} else {
-
owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData,
ex);
+
owner.checkpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData,
ex);
}
}
}
@@ -1252,36 +1249,6 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
}
}
- /**
- * Wrapper for synchronous {@link CheckpointExceptionHandler}. This
implementation catches unhandled, rethrown
- * exceptions and reports them through {@link
#handleAsyncException(String, Throwable)}. As this implementation
- * always handles the exception in some way, it never rethrows.
- */
- static final class AsyncCheckpointExceptionHandler implements
CheckpointExceptionHandler {
-
- /** Owning stream task to which we report async exceptions. */
- final StreamTask<?, ?> owner;
-
- /** Synchronous exception handler to which we delegate. */
- final CheckpointExceptionHandler
synchronousCheckpointExceptionHandler;
-
- AsyncCheckpointExceptionHandler(StreamTask<?, ?> owner) {
- this.owner = Preconditions.checkNotNull(owner);
- this.synchronousCheckpointExceptionHandler =
-
Preconditions.checkNotNull(owner.synchronousCheckpointExceptionHandler);
- }
-
- @Override
- public void tryHandleCheckpointException(CheckpointMetaData
checkpointMetaData, Exception exception) {
- try {
-
synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData,
exception);
- } catch (Exception unhandled) {
- AsynchronousException asyncException = new
AsynchronousException(unhandled);
- owner.handleAsyncException("Failure in
asynchronous checkpoint materialization", asyncException);
- }
- }
- }
-
@VisibleForTesting
public static <OUT>
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>
createRecordWriters(
StreamConfig configuration,
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
index 17ab88f..33a3dc8 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerConfigurationTest.java
@@ -18,17 +18,11 @@
package org.apache.flink.streaming.runtime.tasks;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
-import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
@@ -40,82 +34,47 @@ import org.junit.Test;
public class CheckpointExceptionHandlerConfigurationTest extends TestLogger {
@Test
- public void testConfigurationFailOnException() throws Exception {
- testConfigForwarding(true);
- }
-
- @Test
- public void testConfigurationDeclineOnException() throws Exception {
- testConfigForwarding(false);
+ public void testCheckpointConfigDefault() {
+ StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
+ CheckpointConfig checkpointConfig =
streamExecutionEnvironment.getCheckpointConfig();
+
Assert.assertTrue(checkpointConfig.isFailOnCheckpointingErrors());
+ Assert.assertEquals(0,
checkpointConfig.getTolerableCheckpointFailureNumber());
}
@Test
- public void testFailIsDefaultConfig() {
- ExecutionConfig newExecutionConfig = new ExecutionConfig();
-
Assert.assertTrue(newExecutionConfig.isFailTaskOnCheckpointError());
- }
-
- private void testConfigForwarding(boolean failOnException) throws
Exception {
-
- final boolean expectedHandlerFlag = failOnException;
-
- final DummyEnvironment environment = new
DummyEnvironment("test", 1, 0);
- environment.setTaskStateManager(new TestTaskStateManager());
-
environment.getExecutionConfig().setFailTaskOnCheckpointError(expectedHandlerFlag);
-
- final CheckpointExceptionHandlerFactory inspectingFactory = new
CheckpointExceptionHandlerFactory() {
-
- @Override
- public CheckpointExceptionHandler
createCheckpointExceptionHandler(
- boolean failTaskOnCheckpointException,
- Environment environment) {
-
- Assert.assertEquals(expectedHandlerFlag,
failTaskOnCheckpointException);
- return
super.createCheckpointExceptionHandler(failTaskOnCheckpointException,
environment);
- }
- };
-
- StreamTask streamTask = new StreamTask(environment, null) {
- @Override
- protected void init() throws Exception {}
-
- @Override
- protected void performDefaultAction(ActionContext
context) throws Exception {
- context.allActionsCompleted();
- }
+ public void testSetCheckpointConfig() {
+ StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
+ CheckpointConfig checkpointConfig =
streamExecutionEnvironment.getCheckpointConfig();
- @Override
- protected void cleanup() throws Exception {}
+ // use deprecated API to set not fail on checkpoint errors
+ checkpointConfig.setFailOnCheckpointingErrors(false);
+
Assert.assertFalse(checkpointConfig.isFailOnCheckpointingErrors());
+
Assert.assertEquals(CheckpointFailureManager.UNLIMITED_TOLERABLE_FAILURE_NUMBER,
checkpointConfig.getTolerableCheckpointFailureNumber());
- @Override
- protected void cancelTask() throws Exception {}
+ // use new API to set tolerable declined checkpoint number
+ checkpointConfig.setTolerableCheckpointFailureNumber(5);
+ Assert.assertEquals(5,
checkpointConfig.getTolerableCheckpointFailureNumber());
- @Override
- protected CheckpointExceptionHandlerFactory
createCheckpointExceptionHandlerFactory() {
- return inspectingFactory;
- }
- };
-
- streamTask.invoke();
+ // after we configure the tolerable declined checkpoint number,
deprecated API would not take effect
+ checkpointConfig.setFailOnCheckpointingErrors(true);
+ Assert.assertEquals(5,
checkpointConfig.getTolerableCheckpointFailureNumber());
}
@Test
- public void testCheckpointConfigDefault() throws Exception {
- StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
-
Assert.assertTrue(streamExecutionEnvironment.getCheckpointConfig().isFailOnCheckpointingErrors());
+ public void testPropagationFailFromCheckpointConfig() {
+ try {
+ doTestPropagationFromCheckpointConfig(true);
+ } catch (IllegalArgumentException ignored) {
+ // ignored
+ }
}
@Test
- public void testPropagationFailFromCheckpointConfig() throws Exception {
- doTestPropagationFromCheckpointConfig(true);
- }
-
- @Test
- public void testPropagationDeclineFromCheckpointConfig() throws
Exception {
+ public void testPropagationDeclineFromCheckpointConfig() {
doTestPropagationFromCheckpointConfig(false);
}
- public void doTestPropagationFromCheckpointConfig(boolean
failTaskOnCheckpointErrors) throws Exception {
+ public void doTestPropagationFromCheckpointConfig(boolean
failTaskOnCheckpointErrors) {
StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setParallelism(1);
streamExecutionEnvironment.getCheckpointConfig().setCheckpointInterval(1000);
@@ -123,7 +82,7 @@ public class CheckpointExceptionHandlerConfigurationTest
extends TestLogger {
streamExecutionEnvironment.addSource(new
SourceFunction<Integer>() {
@Override
- public void run(SourceContext<Integer> ctx) throws
Exception {
+ public void run(SourceContext<Integer> ctx) {
}
@Override
@@ -131,13 +90,5 @@ public class CheckpointExceptionHandlerConfigurationTest
extends TestLogger {
}
}).addSink(new DiscardingSink<>());
-
- StreamGraph streamGraph =
streamExecutionEnvironment.getStreamGraph();
- JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(streamGraph);
- SerializedValue<ExecutionConfig> serializedExecutionConfig =
jobGraph.getSerializedExecutionConfig();
- ExecutionConfig executionConfig =
-
serializedExecutionConfig.deserializeValue(Thread.currentThread().getContextClassLoader());
-
- Assert.assertEquals(failTaskOnCheckpointErrors,
executionConfig.isFailTaskOnCheckpointError());
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
index 2f58162..2632c01 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointExceptionHandlerTest.java
@@ -31,30 +31,11 @@ import org.junit.Test;
public class CheckpointExceptionHandlerTest extends TestLogger {
@Test
- public void testRethrowingHandler() {
- DeclineDummyEnvironment environment = new
DeclineDummyEnvironment();
- CheckpointExceptionHandlerFactory
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
- CheckpointExceptionHandler exceptionHandler =
-
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true,
environment);
-
- CheckpointMetaData failedCheckpointMetaData = new
CheckpointMetaData(42L, 4711L);
- Exception testException = new Exception("test");
- try {
-
exceptionHandler.tryHandleCheckpointException(failedCheckpointMetaData,
testException);
- Assert.fail("Exception not rethrown.");
- } catch (Exception e) {
- Assert.assertEquals(testException, e);
- }
-
- Assert.assertNull(environment.getLastDeclinedCheckpointCause());
- }
-
- @Test
public void testDecliningHandler() {
DeclineDummyEnvironment environment = new
DeclineDummyEnvironment();
CheckpointExceptionHandlerFactory
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
CheckpointExceptionHandler exceptionHandler =
-
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(false,
environment);
+
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(environment);
CheckpointMetaData failedCheckpointMetaData = new
CheckpointMetaData(42L, 4711L);
Exception testException = new Exception("test");
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index efc2505..e2171b7 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -310,16 +310,16 @@ public class StreamTaskTest extends TestLogger {
}
@Test
- public void testFailingCheckpointStreamOperator() throws Exception {
+ public void testDecliningCheckpointStreamOperator() throws Exception {
final long checkpointId = 42L;
final long timestamp = 1L;
TaskInfo mockTaskInfo = mock(TaskInfo.class);
when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
- Environment mockEnvironment = new
MockEnvironmentBuilder().build();
+ CheckpointExceptionHandlerTest.DeclineDummyEnvironment
declineDummyEnvironment = new
CheckpointExceptionHandlerTest.DeclineDummyEnvironment();
- StreamTask<?, ?> streamTask = new
EmptyStreamTask(mockEnvironment);
+ StreamTask<?, ?> streamTask = new
EmptyStreamTask(declineDummyEnvironment);
CheckpointMetaData checkpointMetaData = new
CheckpointMetaData(checkpointId, timestamp);
// mock the operators
@@ -360,19 +360,11 @@ public class StreamTaskTest extends TestLogger {
CheckpointExceptionHandlerFactory
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
CheckpointExceptionHandler checkpointExceptionHandler =
-
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true,
mockEnvironment);
- Whitebox.setInternalState(streamTask,
"synchronousCheckpointExceptionHandler", checkpointExceptionHandler);
-
- StreamTask.AsyncCheckpointExceptionHandler
asyncCheckpointExceptionHandler =
- new
StreamTask.AsyncCheckpointExceptionHandler(streamTask);
- Whitebox.setInternalState(streamTask,
"asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler);
-
- try {
- streamTask.triggerCheckpoint(checkpointMetaData,
CheckpointOptions.forCheckpointWithDefaultLocation(), false);
- fail("Expected test exception here.");
- } catch (Exception e) {
- assertEquals(testException, e.getCause());
- }
+
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(declineDummyEnvironment);
+ Whitebox.setInternalState(streamTask,
"checkpointExceptionHandler", checkpointExceptionHandler);
+
+ streamTask.triggerCheckpoint(checkpointMetaData,
CheckpointOptions.forCheckpointWithDefaultLocation(), false);
+ assertEquals(testException,
declineDummyEnvironment.getLastDeclinedCheckpointCause());
verify(operatorSnapshotResult1).cancel();
verify(operatorSnapshotResult2).cancel();
@@ -432,12 +424,8 @@ public class StreamTaskTest extends TestLogger {
CheckpointExceptionHandlerFactory
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
CheckpointExceptionHandler checkpointExceptionHandler =
-
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true,
mockEnvironment);
- Whitebox.setInternalState(streamTask,
"synchronousCheckpointExceptionHandler", checkpointExceptionHandler);
-
- StreamTask.AsyncCheckpointExceptionHandler
asyncCheckpointExceptionHandler =
- new
StreamTask.AsyncCheckpointExceptionHandler(streamTask);
- Whitebox.setInternalState(streamTask,
"asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler);
+
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(mockEnvironment);
+ Whitebox.setInternalState(streamTask,
"checkpointExceptionHandler", checkpointExceptionHandler);
mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
streamTask.triggerCheckpoint(checkpointMetaData,
CheckpointOptions.forCheckpointWithDefaultLocation(), false);
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index c02ff99..589b64c 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -99,7 +99,6 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
@@ -122,27 +121,12 @@ public class TaskCheckpointingBehaviourTest extends
TestLogger {
}
@Test
- public void testTaskFailingOnCheckpointErrorInSyncPart() throws
Exception {
- Throwable failureCause =
runTestTaskFailingOnCheckpointError(new SyncFailureInducingStateBackend());
- assertNotNull(failureCause);
-
- String expectedMessageStart = "Could not perform checkpoint";
- assertEquals(expectedMessageStart,
failureCause.getMessage().substring(0, expectedMessageStart.length()));
- }
-
- @Test
- public void testTaskFailingOnCheckpointErrorInAsyncPart() throws
Exception {
- Throwable failureCause =
runTestTaskFailingOnCheckpointError(new AsyncFailureInducingStateBackend());
- assertEquals(AsynchronousException.class,
failureCause.getClass());
- }
-
- @Test
public void testBlockingNonInterruptibleCheckpoint() throws Exception {
StateBackend lockingStateBackend = new
BackendForTestStream(LockingOutputStream::new);
Task task =
- createTask(new TestOperator(), lockingStateBackend,
mock(CheckpointResponder.class), true);
+ createTask(new TestOperator(), lockingStateBackend,
mock(CheckpointResponder.class));
// start the task and wait until it is in "restore"
task.startTaskThread();
@@ -162,7 +146,7 @@ public class TaskCheckpointingBehaviourTest extends
TestLogger {
TestDeclinedCheckpointResponder checkpointResponder = new
TestDeclinedCheckpointResponder();
Task task =
- createTask(new FilterOperator(), backend,
checkpointResponder, false);
+ createTask(new FilterOperator(), backend,
checkpointResponder);
// start the task and wait until it is in "restore"
task.startTaskThread();
@@ -175,20 +159,6 @@ public class TaskCheckpointingBehaviourTest extends
TestLogger {
task.getExecutingThread().join();
}
- private Throwable
runTestTaskFailingOnCheckpointError(AbstractStateBackend backend) throws
Exception {
-
- Task task =
- createTask(new FilterOperator(), backend,
mock(CheckpointResponder.class), true);
-
- // start the task and wait until it is in "restore"
- task.startTaskThread();
-
- task.getExecutingThread().join();
-
- assertEquals(ExecutionState.FAILED, task.getExecutionState());
- return task.getFailureCause();
- }
-
//
------------------------------------------------------------------------
// Utilities
//
------------------------------------------------------------------------
@@ -196,8 +166,7 @@ public class TaskCheckpointingBehaviourTest extends
TestLogger {
private static Task createTask(
StreamOperator<?> op,
StateBackend backend,
- CheckpointResponder checkpointResponder,
- boolean failOnCheckpointErrors) throws IOException {
+ CheckpointResponder checkpointResponder) throws IOException {
Configuration taskConfig = new Configuration();
StreamConfig cfg = new StreamConfig(taskConfig);
@@ -206,7 +175,6 @@ public class TaskCheckpointingBehaviourTest extends
TestLogger {
cfg.setStateBackend(backend);
ExecutionConfig executionConfig = new ExecutionConfig();
-
executionConfig.setFailTaskOnCheckpointError(failOnCheckpointErrors);
JobInformation jobInformation = new JobInformation(
new JobID(),