[FLINK-9322] [e2e] Add failure simulation to the general purpose DataStream job


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aec4496e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aec4496e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aec4496e

Branch: refs/heads/release-1.5
Commit: aec4496ebe1679f74b76e9eaa2f31d00d8c82447
Parents: ef6e40f
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Fri May 11 11:51:12 2018 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue May 22 16:49:11 2018 +0800

----------------------------------------------------------------------
 .../tests/DataStreamAllroundTestJobFactory.java | 52 +++++++++++++
 .../tests/DataStreamAllroundTestProgram.java    | 10 +++
 .../tests/ExceptionThrowingFailureMapper.java   | 79 ++++++++++++++++++++
 3 files changed, 141 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aec4496e/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index c2e4cf5..05bbc77 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -54,6 +54,13 @@ import java.util.List;
  * <p>Program parameters:
  * <ul>
  *     <li>test.semantics (String, default - 'exactly-once'): This configures 
the semantics to test. Can be 'exactly-once' or 'at-least-once'.</li>
+ *     <li>test.simulate_failure (boolean, default - false): This configures 
whether or not to simulate failures by throwing exceptions within the job.</li>
+ *     <li>test.simulate_failure.num_records (long, default - 100L): The 
number of records to process before throwing an exception, per job execution 
attempt.
+ *         Only relevant if configured to simulate failures.</li>
+ *     <li>test.simulate_failure.num_checkpoints (long, default - 1L): The 
number of complete checkpoints before throwing an exception, per job execution 
attempt.
+ *         Only relevant if configured to simulate failures.</li>
+ *     <li>test.simulate_failure.max_failures (int, default - 1): The maximum 
number of times to fail the job. This also takes into account failures that
+ *         were not triggered by the job's own failure simulation, e.g. 
TaskManager or JobManager failures. Only relevant if configured to simulate 
failures.</li>
  *     <li>environment.checkpoint_interval (long, default - 1000): the 
checkpoint interval.</li>
  *     <li>environment.externalize_checkpoint (boolean, default - false): 
whether or not checkpoints should be externalized.</li>
  *     <li>environment.externalize_checkpoint.cleanup (String, default - 
'retain'): Configures the cleanup mode for externalized checkpoints. Can be 
'retain' or 'delete'.</li>
@@ -78,6 +85,33 @@ class DataStreamAllroundTestJobFactory {
                .defaultValue("exactly-once")
                .withDescription("This configures the semantics to test. Can be 
'exactly-once' or 'at-least-once'");
 
+       private static final ConfigOption<Boolean> TEST_SIMULATE_FAILURE = 
ConfigOptions
+               .key("test.simulate_failure")
+               .defaultValue(false)
+               .withDescription("This configures whether or not to simulate 
failures by throwing exceptions within the job.");
+
+       private static final ConfigOption<Long> 
TEST_SIMULATE_FAILURE_NUM_RECORDS = ConfigOptions
+               .key("test.simulate_failure.num_records")
+               .defaultValue(100L)
+               .withDescription(
+                       "The number of records to process before throwing an 
exception, per job execution attempt." +
+                               " Only relevant if configured to simulate 
failures.");
+
+       private static final ConfigOption<Long> 
TEST_SIMULATE_FAILURE_NUM_CHECKPOINTS = ConfigOptions
+               .key("test.simulate_failure.num_checkpoints")
+               .defaultValue(1L)
+               .withDescription(
+                       "The number of complete checkpoints before throwing an 
exception, per job execution attempt." +
+                               " Only relevant if configured to simulate 
failures.");
+
+       private static final ConfigOption<Integer> 
TEST_SIMULATE_FAILURE_MAX_FAILURES = ConfigOptions
+               .key("test.simulate_failure.max_failures")
+               .defaultValue(1)
+               .withDescription(
+                       "The maximum number of times to fail the job. This also 
takes into account failures that were not triggered" +
+                               " by the job's own failure simulation, e.g. 
TaskManager or JobManager failures." +
+                               " Only relevant if configured to simulate 
failures.");
+
        private static final ConfigOption<Long> ENVIRONMENT_CHECKPOINT_INTERVAL 
= ConfigOptions
                .key("environment.checkpoint_interval")
                .defaultValue(1000L);
@@ -273,6 +307,24 @@ class DataStreamAllroundTestJobFactory {
                return new SemanticsCheckMapper(validatorFunction);
        }
 
+       static boolean isSimulateFailures(ParameterTool pt) {
+               return pt.getBoolean(TEST_SIMULATE_FAILURE.key(), 
TEST_SIMULATE_FAILURE.defaultValue());
+       }
+
+       static MapFunction<Event, Event> 
createExceptionThrowingFailureMapper(ParameterTool pt) {
+               return new ExceptionThrowingFailureMapper<>(
+                       pt.getLong(
+                               TEST_SIMULATE_FAILURE_NUM_RECORDS.key(),
+                               
TEST_SIMULATE_FAILURE_NUM_RECORDS.defaultValue()),
+                       pt.getLong(
+                               TEST_SIMULATE_FAILURE_NUM_CHECKPOINTS.key(),
+                               
TEST_SIMULATE_FAILURE_NUM_CHECKPOINTS.defaultValue()),
+                       pt.getInt(
+                               TEST_SIMULATE_FAILURE_MAX_FAILURES.key(),
+                               
TEST_SIMULATE_FAILURE_MAX_FAILURES.defaultValue()));
+
+       }
+
        static <IN, OUT, STATE> ArtificialKeyedStateMapper<IN, OUT> 
createArtificialKeyedStateMapper(
                MapFunction<IN, OUT> mapFunction,
                JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,

http://git-wip-us.apache.org/repos/asf/flink/blob/aec4496e/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index 5ae1d16..afbc01a 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -32,8 +32,10 @@ import java.util.Collections;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialOperatorStateMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createExceptionThrowingFailureMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
 
 /**
@@ -56,6 +58,7 @@ public class DataStreamAllroundTestProgram {
        private static final String KEYED_STATE_OPER_NAME = 
"ArtificalKeyedStateMapper";
        private static final String OPERATOR_STATE_OPER_NAME = 
"ArtificalOperatorStateMapper";
        private static final String SEMANTICS_CHECK_MAPPER_NAME = 
"SemanticsCheckMapper";
+       private static final String FAILURE_MAPPER_NAME = 
"ExceptionThrowingFailureMapper";
 
        public static void main(String[] args) throws Exception {
                final ParameterTool pt = ParameterTool.fromArgs(args);
@@ -89,6 +92,13 @@ public class DataStreamAllroundTestProgram {
                        .name(OPERATOR_STATE_OPER_NAME)
                        .returns(Event.class);
 
+               if (isSimulateFailures(pt)) {
+                       eventStream2 = eventStream2
+                               .map(createExceptionThrowingFailureMapper(pt))
+                               .setParallelism(1)
+                               .name(FAILURE_MAPPER_NAME);
+               }
+
                eventStream2.keyBy(Event::getKey)
                        .flatMap(createSemanticsCheckMapper(pt))
                        .name(SEMANTICS_CHECK_MAPPER_NAME)

http://git-wip-us.apache.org/repos/asf/flink/blob/aec4496e/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java
new file mode 100644
index 0000000..d758ef5
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/ExceptionThrowingFailureMapper.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.runtime.state.CheckpointListener;
+
+/**
+ * This mapper simulates failure by throwing exceptions. The timing to throw an
+ * exception by configuring the number of records to process, and number of
+ * complete checkpoints to be acknowledged before throwing the exception.
+ *
+ * <p>The total times to simulate a failure across multiple execution attempts
+ * of the operator can also be configured. Note that this also takes into 
account
+ * failures that were not triggered by this mapper, e.g. TaskManager failures.
+ */
+public class ExceptionThrowingFailureMapper<T> extends RichMapFunction<T, T> 
implements CheckpointListener {
+
+       private static final long serialVersionUID = -5286927943454740016L;
+
+       private final long numProcessedRecordsFailureThreshold;
+       private final long numCompleteCheckpointsFailureThreshold;
+       private final int maxNumFailures;
+
+       private long numProcessedRecords;
+       private long numCompleteCheckpoints;
+
+       public ExceptionThrowingFailureMapper(
+                       long numProcessedRecordsFailureThreshold,
+                       long numCompleteCheckpointsFailureThreshold,
+                       int maxNumFailures) {
+
+               this.numProcessedRecordsFailureThreshold = 
numProcessedRecordsFailureThreshold;
+               this.numCompleteCheckpointsFailureThreshold = 
numCompleteCheckpointsFailureThreshold;
+               this.maxNumFailures = maxNumFailures;
+       }
+
+       @Override
+       public T map(T value) throws Exception {
+               numProcessedRecords++;
+
+               if (isReachedFailureThreshold()) {
+                       throw new Exception("Artificial failure.");
+               }
+
+               return value;
+       }
+
+       @Override
+       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+               numCompleteCheckpoints++;
+
+               if (isReachedFailureThreshold()) {
+                       throw new Exception("Artificial failure.");
+               }
+       }
+
+       private boolean isReachedFailureThreshold() {
+               return numProcessedRecords >= 
numProcessedRecordsFailureThreshold
+                       && numCompleteCheckpoints >= 
numCompleteCheckpointsFailureThreshold
+                       && getRuntimeContext().getAttemptNumber() < 
maxNumFailures;
+       }
+}

Reply via email to