[FLINK-9322] [e2e] Add failure simulation to the general purpose DataStream job
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aec4496e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aec4496e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aec4496e Branch: refs/heads/release-1.5 Commit: aec4496ebe1679f74b76e9eaa2f31d00d8c82447 Parents: ef6e40f Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Fri May 11 11:51:12 2018 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue May 22 16:49:11 2018 +0800 ---------------------------------------------------------------------- .../tests/DataStreamAllroundTestJobFactory.java | 52 +++++++++++++ .../tests/DataStreamAllroundTestProgram.java | 10 +++ .../tests/ExceptionThrowingFailureMapper.java | 79 ++++++++++++++++++++ 3 files changed, 141 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/aec4496e/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java index c2e4cf5..05bbc77 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java @@ -54,6 +54,13 @@ import java.util.List; * <p>Program parameters: * <ul> * <li>test.semantics (String, default - 'exactly-once'): This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'.</li> + * <li>test.simulate_failure (boolean, default - false): This configures whether or not to simulate failures by throwing exceptions within the job.</li> + * <li>test.simulate_failure.num_records (long, default - 100L): The number of records to process before throwing an exception, per job execution attempt. + * Only relevant if configured to simulate failures.</li> + * <li>test.simulate_failure.num_checkpoints (long, default - 1L): The number of complete checkpoints before throwing an exception, per job execution attempt. + * Only relevant if configured to simulate failures.</li> + * <li>test.simulate_failure.max_failures (int, default - 1): The maximum number of times to fail the job. This also takes into account failures that + * were not triggered by the job's own failure simulation, e.g. TaskManager or JobManager failures. Only relevant if configured to simulate failures.</li> * <li>environment.checkpoint_interval (long, default - 1000): the checkpoint interval.</li> * <li>environment.externalize_checkpoint (boolean, default - false): whether or not checkpoints should be externalized.</li> * <li>environment.externalize_checkpoint.cleanup (String, default - 'retain'): Configures the cleanup mode for externalized checkpoints. Can be 'retain' or 'delete'.</li> @@ -78,6 +85,33 @@ class DataStreamAllroundTestJobFactory { .defaultValue("exactly-once") .withDescription("This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'"); + private static final ConfigOption<Boolean> TEST_SIMULATE_FAILURE = ConfigOptions + .key("test.simulate_failure") + .defaultValue(false) + .withDescription("This configures whether or not to simulate failures by throwing exceptions within the job."); + + private static final ConfigOption<Long> TEST_SIMULATE_FAILURE_NUM_RECORDS = ConfigOptions + .key("test.simulate_failure.num_records") + .defaultValue(100L) + .withDescription( + "The number of records to process before throwing an exception, per job execution attempt." + + " Only relevant if configured to simulate failures."); + + private static final ConfigOption<Long> TEST_SIMULATE_FAILURE_NUM_CHECKPOINTS = ConfigOptions + .key("test.simulate_failure.num_checkpoints") + .defaultValue(1L) + .withDescription( + "The number of complete checkpoints before throwing an exception, per job execution attempt." + + " Only relevant if configured to simulate failures."); + + private static final ConfigOption<Integer> TEST_SIMULATE_FAILURE_MAX_FAILURES = ConfigOptions + .key("test.simulate_failure.max_failures") + .defaultValue(1) + .withDescription( + "The maximum number of times to fail the job. This also takes into account failures that were not triggered" + + " by the job's own failure simulation, e.g. TaskManager or JobManager failures." + + " Only relevant if configured to simulate failures."); + private static final ConfigOption<Long> ENVIRONMENT_CHECKPOINT_INTERVAL = ConfigOptions .key("environment.checkpoint_interval") .defaultValue(1000L); @@ -273,6 +307,24 @@ class DataStreamAllroundTestJobFactory { return new SemanticsCheckMapper(validatorFunction); } + static boolean isSimulateFailures(ParameterTool pt) { + return pt.getBoolean(TEST_SIMULATE_FAILURE.key(), TEST_SIMULATE_FAILURE.defaultValue()); + } + + static MapFunction<Event, Event> createExceptionThrowingFailureMapper(ParameterTool pt) { + return new ExceptionThrowingFailureMapper<>( + pt.getLong( + TEST_SIMULATE_FAILURE_NUM_RECORDS.key(), + TEST_SIMULATE_FAILURE_NUM_RECORDS.defaultValue()), + pt.getLong( + TEST_SIMULATE_FAILURE_NUM_CHECKPOINTS.key(), + TEST_SIMULATE_FAILURE_NUM_CHECKPOINTS.defaultValue()), + pt.getInt( + TEST_SIMULATE_FAILURE_MAX_FAILURES.key(), + TEST_SIMULATE_FAILURE_MAX_FAILURES.defaultValue())); + + } + static <IN, OUT, STATE> ArtificialKeyedStateMapper<IN, OUT> createArtificialKeyedStateMapper( MapFunction<IN, OUT> mapFunction, JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState, http://git-wip-us.apache.org/repos/asf/flink/blob/aec4496e/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java index 5ae1d16..afbc01a 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java @@ -32,8 +32,10 @@ import java.util.Collections; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialOperatorStateMapper; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource; +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createExceptionThrowingFailureMapper; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor; +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment; /** @@ -56,6 +58,7 @@ public class DataStreamAllroundTestProgram { private static final String KEYED_STATE_OPER_NAME = "ArtificalKeyedStateMapper"; private static final String OPERATOR_STATE_OPER_NAME = "ArtificalOperatorStateMapper"; private static final String SEMANTICS_CHECK_MAPPER_NAME = "SemanticsCheckMapper"; + private static final String FAILURE_MAPPER_NAME = "ExceptionThrowingFailureMapper"; public static void main(String[] args) throws Exception { final ParameterTool pt = ParameterTool.fromArgs(args); @@ -89,6 +92,13 @@ public class DataStreamAllroundTestProgram { .name(OPERATOR_STATE_OPER_NAME) .returns(Event.class); + if (isSimulateFailures(pt)) { + eventStream2 = eventStream2 + .map(createExceptionThrowingFailureMapper(pt)) + .setParallelism(1) + .name(FAILURE_MAPPER_NAME); + } + eventStream2.keyBy(Event::getKey) .flatMap(createSemanticsCheckMapper(pt)) .name(SEMANTICS_CHECK_MAPPER_NAME) http://git-wip-us.apache.org/repos/asf/flink/blob/aec4496e/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java new file mode 100644 index 0000000..d758ef5 --- /dev/null +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.runtime.state.CheckpointListener; + +/** + * This mapper simulates failure by throwing exceptions. The timing to throw an + * exception by configuring the number of records to process, and number of + * complete checkpoints to be acknowledged before throwing the exception. + * + * <p>The total times to simulate a failure across multiple execution attempts + * of the operator can also be configured. Note that this also takes into account + * failures that were not triggered by this mapper, e.g. TaskManager failures. + */ +public class ExceptionThrowingFailureMapper<T> extends RichMapFunction<T, T> implements CheckpointListener { + + private static final long serialVersionUID = -5286927943454740016L; + + private final long numProcessedRecordsFailureThreshold; + private final long numCompleteCheckpointsFailureThreshold; + private final int maxNumFailures; + + private long numProcessedRecords; + private long numCompleteCheckpoints; + + public ExceptionThrowingFailureMapper( + long numProcessedRecordsFailureThreshold, + long numCompleteCheckpointsFailureThreshold, + int maxNumFailures) { + + this.numProcessedRecordsFailureThreshold = numProcessedRecordsFailureThreshold; + this.numCompleteCheckpointsFailureThreshold = numCompleteCheckpointsFailureThreshold; + this.maxNumFailures = maxNumFailures; + } + + @Override + public T map(T value) throws Exception { + numProcessedRecords++; + + if (isReachedFailureThreshold()) { + throw new Exception("Artificial failure."); + } + + return value; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + numCompleteCheckpoints++; + + if (isReachedFailureThreshold()) { + throw new Exception("Artificial failure."); + } + } + + private boolean isReachedFailureThreshold() { + return numProcessedRecords >= numProcessedRecordsFailureThreshold + && numCompleteCheckpoints >= numCompleteCheckpointsFailureThreshold + && getRuntimeContext().getAttemptNumber() < maxNumFailures; + } +}
