[FLINK-8910][e2e] Automated test for local recovery (including sticky allocation)
This closes #5676. (cherry picked from commit 489e428) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da5b6d7a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da5b6d7a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da5b6d7a Branch: refs/heads/release-1.5 Commit: da5b6d7adcccb2531b6678be9aa127ebb0c15be1 Parents: fbe3cbf Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Tue Mar 6 10:35:44 2018 +0100 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Thu May 17 10:10:56 2018 +0200 ---------------------------------------------------------------------- .../pom.xml | 96 ++++ ...StickyAllocationAndLocalRecoveryTestJob.java | 478 +++++++++++++++++++ .../src/main/resources/log4j-test.properties | 27 ++ flink-end-to-end-tests/pom.xml | 1 + flink-end-to-end-tests/run-nightly-tests.sh | 8 + flink-end-to-end-tests/test-scripts/common.sh | 43 +- .../test_local_recovery_and_scheduling.sh | 121 +++++ .../TaskExecutorLocalStateStoresManager.java | 2 +- .../api/operators/BackendRestorerProcedure.java | 2 + 9 files changed, 773 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/da5b6d7a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml new file mode 100644 index 0000000..8ba7ef5 --- /dev/null +++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml @@ -0,0 +1,96 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.5-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-local-recovery-and-allocation-test</artifactId> + <name>flink-local-recovery-and-allocation-test</name> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <executions> + <!-- StickyAllocationAndLocalRecoveryTestJob --> + <execution> + <id>StickyAllocationAndLocalRecoveryTestJob</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <finalName>StickyAllocationAndLocalRecoveryTestJob</finalName> + + <archive> + <manifestEntries> + <program-class>org.apache.flink.streaming.tests.StickyAllocationAndLocalRecoveryTestJob</program-class> + </manifestEntries> + </archive> + + <includes> + <include>org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.class</include> + <include>org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob$*.class</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/da5b6d7a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java new file mode 100644 index 0000000..b03791e --- /dev/null +++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java @@ -0,0 +1,478 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +/** + * Automatic end-to-end test for local recovery (including sticky allocation). + * + * <p>List of possible input parameters for this job: + * <ul> + * <li>checkpointDir: the checkpoint directory, required.</li> + * <li>parallelism: the parallelism of the job, default 1.</li> + * <li>maxParallelism: the maximum parallelism of the job, default 1.</li> + * <li>checkpointInterval: the checkpointing interval in milliseconds, default 1000.</li> + * <li>restartDelay: the delay of the fixed delay restart strategy, default 0.</li> + * <li>externalizedCheckpoints: flag to activate externalized checkpoints, default <code>false</code>.</li> + * <li>stateBackend: choice for state backend between <code>file</code> and <code>rocks</code>, default <code>file</code>.</li> + * <li>killJvmOnFail: flag that determines whether or not an artificial failure induced by the test kills the JVM or not.</li> + * <li>asyncCheckpoints: flag for async checkpoints with file state backend, default <code>true</code>.</li> + * <li>incrementalCheckpoints: flag for incremental checkpoint with rocks state backend, default <code>false</code>.</li> + * <li>delay: sleep delay to throttle down the production of the source, default 0.</li> + * <li>maxAttempts: the maximum number of run attempts, before the job finishes with success, default 3.</li> + * <li>valueSize: size of the artificial value for each key in bytes, default 10.</li> + * </ul> + */ +public class StickyAllocationAndLocalRecoveryTestJob { + + private static final Logger LOG = LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class); + + public static void main(String[] args) throws Exception { + + final ParameterTool pt = ParameterTool.fromArgs(args); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env.setParallelism(pt.getInt("parallelism", 1)); + env.setMaxParallelism(pt.getInt("maxParallelism", pt.getInt("parallelism", 1))); + env.enableCheckpointing(pt.getInt("checkpointInterval", 1000)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, pt.getInt("restartDelay", 0))); + if (pt.getBoolean("externalizedCheckpoints", false)) { + env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + } + + String stateBackend = pt.get("stateBackend", "file"); + String checkpointDir = pt.getRequired("checkpointDir"); + + boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false); + + if ("file".equals(stateBackend)) { + boolean asyncCheckpoints = pt.getBoolean("asyncCheckpoints", true); + env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints)); + } else if ("rocks".equals(stateBackend)) { + boolean incrementalCheckpoints = pt.getBoolean("incrementalCheckpoints", false); + env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints)); + } else { + throw new IllegalArgumentException("Unknown backend: " + stateBackend); + } + + // make parameters available in the web interface + env.getConfig().setGlobalJobParameters(pt); + + // delay to throttle down the production of the source + long delay = pt.getLong("delay", 0L); + + // the maximum number of attempts, before the job finishes with success + int maxAttempts = pt.getInt("maxAttempts", 3); + + // size of one artificial value + int valueSize = pt.getInt("valueSize", 10); + + env.addSource(new RandomLongSource(maxAttempts, delay)) + .keyBy((KeySelector<Long, Long>) aLong -> aLong) + .flatMap(new StateCreatingFlatMap(valueSize, killJvmOnFail)) + .addSink(new PrintSinkFunction<>()); + + env.execute("Sticky Allocation And Local Recovery Test"); + } + + /** + * Source function that produces a long sequence. + */ + private static final class RandomLongSource extends RichParallelSourceFunction<Long> implements CheckpointedFunction { + + private static final long serialVersionUID = 1L; + + /** + * Generator delay between two events. + */ + final long delay; + + /** + * Maximum restarts before shutting down this source. + */ + final int maxAttempts; + + /** + * State that holds the current key for recovery. + */ + transient ListState<Long> sourceCurrentKeyState; + + /** + * Generator's current key. + */ + long currentKey; + + /** + * Generator runs while this is true. + */ + volatile boolean running; + + RandomLongSource(int maxAttempts, long delay) { + this.delay = delay; + this.maxAttempts = maxAttempts; + this.running = true; + } + + @Override + public void run(SourceContext<Long> sourceContext) throws Exception { + + int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); + int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); + + // the source emits one final event and shuts down once we have reached max attempts. + if (getRuntimeContext().getAttemptNumber() > maxAttempts) { + synchronized (sourceContext.getCheckpointLock()) { + sourceContext.collect(Long.MAX_VALUE - subtaskIdx); + } + return; + } + + while (running) { + + synchronized (sourceContext.getCheckpointLock()) { + sourceContext.collect(currentKey); + currentKey += numberOfParallelSubtasks; + } + + if (delay > 0) { + Thread.sleep(delay); + } + } + } + + @Override + public void cancel() { + running = false; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + sourceCurrentKeyState.clear(); + sourceCurrentKeyState.add(currentKey); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + + ListStateDescriptor<Long> currentKeyDescriptor = new ListStateDescriptor<>("currentKey", Long.class); + sourceCurrentKeyState = context.getOperatorStateStore().getListState(currentKeyDescriptor); + + currentKey = getRuntimeContext().getIndexOfThisSubtask(); + Iterable<Long> iterable = sourceCurrentKeyState.get(); + if (iterable != null) { + Iterator<Long> iterator = iterable.iterator(); + if (iterator.hasNext()) { + currentKey = iterator.next(); + Preconditions.checkState(!iterator.hasNext()); + } + } + } + } + + /** + * Stateful map function. Failure creation and checks happen here. + */ + private static final class StateCreatingFlatMap + extends RichFlatMapFunction<Long, String> implements CheckpointedFunction, CheckpointListener { + + private static final long serialVersionUID = 1L; + + /** + * User configured size of the generated artificial values in the keyed state. + */ + final int valueSize; + + /** + * Holds the user configuration if the artificial test failure is killing the JVM. + */ + final boolean killTaskOnFailure; + + /** + * This state is used to create artificial keyed state in the backend. + */ + transient ValueState<String> valueState; + + /** + * This state is used to persist the schedulingAndFailureInfo to state. + */ + transient ListState<MapperSchedulingAndFailureInfo> schedulingAndFailureState; + + /** + * This contains the current scheduling and failure meta data. + */ + transient MapperSchedulingAndFailureInfo currentSchedulingAndFailureInfo; + + /** + * Message to indicate that recovery detected a failure with sticky allocation. + */ + transient volatile String allocationFailureMessage; + + /** + * If this flag is true, the next invocation of the map function introduces a test failure. + */ + transient volatile boolean failTask; + + StateCreatingFlatMap(int valueSize, boolean killTaskOnFailure) { + this.valueSize = valueSize; + this.failTask = false; + this.killTaskOnFailure = killTaskOnFailure; + this.allocationFailureMessage = null; + } + + @Override + public void flatMap(Long key, Collector<String> collector) throws IOException { + + if (allocationFailureMessage != null) { + // Report the failure downstream, so that we can get the message from the output. + collector.collect(allocationFailureMessage); + allocationFailureMessage = null; + } + + if (failTask) { + // we fail the task, either by killing the JVM hard, or by throwing a user code exception. + if (killTaskOnFailure) { + Runtime.getRuntime().halt(-1); + } else { + throw new RuntimeException("Artificial user code exception."); + } + } + + // sanity check + if (null != valueState.value()) { + throw new IllegalStateException("This should never happen, keys are generated monotonously."); + } + + // store artificial data to blow up the state + valueState.update(RandomStringUtils.random(valueSize, true, true)); + } + + @Override + public void snapshotState(FunctionSnapshotContext functionSnapshotContext) { + } + + @Override + public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception { + ValueStateDescriptor<String> stateDescriptor = + new ValueStateDescriptor<>("state", String.class); + valueState = functionInitializationContext.getKeyedStateStore().getState(stateDescriptor); + + ListStateDescriptor<MapperSchedulingAndFailureInfo> mapperInfoStateDescriptor = + new ListStateDescriptor<>("mapperState", MapperSchedulingAndFailureInfo.class); + schedulingAndFailureState = + functionInitializationContext.getOperatorStateStore().getUnionListState(mapperInfoStateDescriptor); + + StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) getRuntimeContext(); + String allocationID = runtimeContext.getAllocationIDAsString(); + + final int thisJvmPid = getJvmPid(); + final Set<Integer> killedJvmPids = new HashSet<>(); + + // here we check if the sticky scheduling worked as expected + if (functionInitializationContext.isRestored()) { + Iterable<MapperSchedulingAndFailureInfo> iterable = schedulingAndFailureState.get(); + String taskNameWithSubtasks = runtimeContext.getTaskNameWithSubtasks(); + + MapperSchedulingAndFailureInfo infoForThisTask = null; + List<MapperSchedulingAndFailureInfo> completeInfo = new ArrayList<>(); + if (iterable != null) { + for (MapperSchedulingAndFailureInfo testInfo : iterable) { + + completeInfo.add(testInfo); + + if (taskNameWithSubtasks.equals(testInfo.taskNameWithSubtask)) { + infoForThisTask = testInfo; + } + + if (testInfo.killedJvm) { + killedJvmPids.add(testInfo.jvmPid); + } + } + } + + Preconditions.checkNotNull(infoForThisTask, "Expected to find info here."); + + if (!isScheduledToCorrectAllocation(infoForThisTask, allocationID, killedJvmPids)) { + allocationFailureMessage = String.format( + "Sticky allocation test failed: Subtask %s in attempt %d was rescheduled from allocation %s " + + "on JVM with PID %d to unexpected allocation %s on JVM with PID %d.\n" + + "Complete information from before the crash: %s.", + runtimeContext.getTaskNameWithSubtasks(), + runtimeContext.getAttemptNumber(), + infoForThisTask.allocationId, + infoForThisTask.jvmPid, + allocationID, + thisJvmPid, + completeInfo); + } + } + + // We determine which of the subtasks will produce the artificial failure + boolean failingTask = shouldTaskFailForThisAttempt(); + + // We take note of all the meta info that we require to check sticky scheduling in the next re-attempt + this.currentSchedulingAndFailureInfo = new MapperSchedulingAndFailureInfo( + failingTask, + failingTask && killTaskOnFailure, + thisJvmPid, + runtimeContext.getTaskNameWithSubtasks(), + allocationID); + + schedulingAndFailureState.clear(); + schedulingAndFailureState.add(currentSchedulingAndFailureInfo); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + // we can only fail the task after at least one checkpoint is completed to record progress. + failTask = currentSchedulingAndFailureInfo.failingTask; + } + + private boolean shouldTaskFailForThisAttempt() { + RuntimeContext runtimeContext = getRuntimeContext(); + int numSubtasks = runtimeContext.getNumberOfParallelSubtasks(); + int subtaskIdx = runtimeContext.getIndexOfThisSubtask(); + int attempt = runtimeContext.getAttemptNumber(); + return (attempt % numSubtasks) == subtaskIdx; + } + + private boolean isScheduledToCorrectAllocation( + MapperSchedulingAndFailureInfo infoForThisTask, + String allocationID, + Set<Integer> killedJvmPids) { + + return (infoForThisTask.allocationId.equals(allocationID) + || killedJvmPids.contains(infoForThisTask.jvmPid)); + } + } + + /** + * This code is copied from Stack Overflow. + * + * <p><a href="https://stackoverflow.com/questions/35842">https://stackoverflow.com/questions/35842</a>, answer + * <a href="https://stackoverflow.com/a/12066696/9193881">https://stackoverflow.com/a/12066696/9193881</a> + * + * <p>Author: <a href="https://stackoverflow.com/users/446591/brad-mace">Brad Mace</a>) + */ + private static int getJvmPid() throws Exception { + java.lang.management.RuntimeMXBean runtime = + java.lang.management.ManagementFactory.getRuntimeMXBean(); + java.lang.reflect.Field jvm = runtime.getClass().getDeclaredField("jvm"); + jvm.setAccessible(true); + sun.management.VMManagement mgmt = + (sun.management.VMManagement) jvm.get(runtime); + java.lang.reflect.Method pidMethod = + mgmt.getClass().getDeclaredMethod("getProcessId"); + pidMethod.setAccessible(true); + + return (int) (Integer) pidMethod.invoke(mgmt); + } + + /** + * Records the information required to check sticky scheduling after a restart. + */ + public static class MapperSchedulingAndFailureInfo implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * True iff this task inflicts a test failure. + */ + final boolean failingTask; + + /** + * True iff this task kills its JVM. + */ + final boolean killedJvm; + + /** + * PID of the task JVM. + */ + final int jvmPid; + + /** + * Name and subtask index of the task. + */ + final String taskNameWithSubtask; + + /** + * The current allocation id of this task. + */ + final String allocationId; + + MapperSchedulingAndFailureInfo( + boolean failingTask, + boolean killedJvm, + int jvmPid, + String taskNameWithSubtask, + String allocationId) { + + this.failingTask = failingTask; + this.killedJvm = killedJvm; + this.jvmPid = jvmPid; + this.taskNameWithSubtask = taskNameWithSubtask; + this.allocationId = allocationId; + } + + @Override + public String toString() { + return "MapperTestInfo{" + + "failingTask=" + failingTask + + ", killedJvm=" + killedJvm + + ", jvmPid=" + jvmPid + + ", taskNameWithSubtask='" + taskNameWithSubtask + '\'' + + ", allocationId='" + allocationId + '\'' + + '}'; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/da5b6d7a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties new file mode 100644 index 0000000..37c65e9 --- /dev/null +++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=INFO, testlogger + +# testlogger is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/da5b6d7a/flink-end-to-end-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 18d4b91..45b63f0 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -42,6 +42,7 @@ under the License. <module>flink-bucketing-sink-test</module> <module>flink-high-parallelism-iterations-test</module> <module>flink-stream-stateful-job-upgrade-test</module> + <module>flink-local-recovery-and-allocation-test</module> </modules> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/da5b6d7a/flink-end-to-end-tests/run-nightly-tests.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index d4309c1..bd91bb2 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -132,5 +132,13 @@ if [ $EXIT_CODE == 0 ]; then EXIT_CODE=$? fi +if [ $EXIT_CODE == 0 ]; then + printf "\n==============================================================================\n" + printf "Running local recovery and sticky scheduling nightly end-to-end test\n" + printf "==============================================================================\n" + $END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh + EXIT_CODE=$? +fi + # Exit code for Travis build success/failure exit $EXIT_CODE http://git-wip-us.apache.org/repos/asf/flink/blob/da5b6d7a/flink-end-to-end-tests/test-scripts/common.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index 0fbac42..1db5dd2 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -318,6 +318,39 @@ function s3_delete { https://${bucket}.s3.amazonaws.com/${s3_file} } +# This function starts the given number of task managers and monitors their processes. If a task manager process goes +# away a replacement is started. +function tm_watchdog { + local expectedTm=$1 + while true; + do + runningTm=`jps | grep -Eo 'TaskManagerRunner|TaskManager' | wc -l`; + count=$((expectedTm-runningTm)) + for (( c=0; c<count; c++ )) + do + $FLINK_DIR/bin/taskmanager.sh start > /dev/null + done + sleep 5; + done +} + +# Kills all job manager. +function jm_kill_all { + kill_all 'StandaloneSessionClusterEntrypoint' +} + +# Kills all task manager. +function tm_kill_all { + kill_all 'TaskManagerRunner|TaskManager' +} + +# Kills all processes that match the given name. +function kill_all { + local pid=`jps | grep -E "${1}" | cut -d " " -f 1` + kill ${pid} 2> /dev/null + wait ${pid} 2> /dev/null +} + function kill_random_taskmanager { KILL_TM=$(jps | grep "TaskManager" | sort -R | head -n 1 | awk '{print $1}') kill -9 "$KILL_TM" @@ -430,12 +463,14 @@ function run_test { return "${exit_code}" } -# make sure to clean up even in case of failures +# Shuts down the cluster and cleans up all temporary folders and files. Make sure to clean up even in case of failures. function cleanup { stop_cluster - check_all_pass - rm -rf $TEST_DATA_DIR - rm -f $FLINK_DIR/log/* + tm_kill_all + jm_kill_all + rm -rf $TEST_DATA_DIR 2> /dev/null revert_default_config + check_all_pass + rm -rf $FLINK_DIR/log/* 2> /dev/null } trap cleanup EXIT http://git-wip-us.apache.org/repos/asf/flink/blob/da5b6d7a/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh b/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh new file mode 100755 index 0000000..98ef01f --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh @@ -0,0 +1,121 @@ +#!/usr/bin/env bash + +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common.sh + +# This function checks the logs for entries that indicate problems with local recovery +function check_logs { + local parallelism=$1 + local attempts=$2 + (( expected_count=parallelism * (attempts + 1) )) + + # Search for the log message that indicates restore problem from existing local state for the keyed backend. + local failed_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (2/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ') + + # Search for attempts to recover locally. + local attempt_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (1/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ') + + if [ ${failed_local_recovery} -ne 0 ] + then + PASS="" + echo "FAILURE: Found ${failed_local_recovery} failed attempt(s) for local recovery of correctly scheduled task(s)." + fi + + if [ ${attempt_local_recovery} -eq 0 ] + then + PASS="" + echo "FAILURE: Found no attempt for local recovery. Configuration problem?" + fi +} + +# This function does a cleanup after the test. The configuration is restored, the watchdog is terminated and temporary +# files and folders are deleted. +function cleanup_after_test { + # Reset the configurations + sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=INFO, file/' "$FLINK_DIR/conf/log4j.properties" + # + kill ${watchdog_pid} 2> /dev/null + wait ${watchdog_pid} 2> /dev/null + # + cleanup +} + +# Calls the cleanup step for this tests and exits with an error. +function cleanup_after_test_and_exit_fail { + cleanup_after_test + exit 1 +} + +## This function executes one run for a certain configuration +function run_local_recovery_test { + local parallelism=$1 + local max_attempts=$2 + local backend=$3 + local incremental=$4 + local kill_jvm=$5 + + echo "Running local recovery test on ${backend} backend: incremental checkpoints = ${incremental}, kill JVM = ${kill_jvm}." + TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-local-recovery-and-allocation-test/target/StickyAllocationAndLocalRecoveryTestJob.jar + + # Backup conf and configure for HA + backup_config + create_ha_config + + # Enable debug logging + sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=DEBUG, file/' "$FLINK_DIR/conf/log4j.properties" + + # Enable local recovery + set_conf "state.backend.local-recovery" "true" + # Ensure that each TM only has one operator(chain) + set_conf "taskmanager.numberOfTaskSlots" "1" + + rm $FLINK_DIR/log/* 2> /dev/null + + # Start HA server + start_local_zk + start_cluster + + tm_watchdog ${parallelism} & + watchdog_pid=$! + + echo "Started TM watchdog with PID ${watchdog_pid}." + + $FLINK_DIR/bin/flink run -c org.apache.flink.streaming.tests.StickyAllocationAndLocalRecoveryTestJob \ + -p ${parallelism} $TEST_PROGRAM_JAR \ + -D state.backend.local-recovery=ENABLE_FILE_BASED \ + --checkpointDir file://$TEST_DATA_DIR/local_recovery_test/checkpoints \ + --output $TEST_DATA_DIR/out/local_recovery_test/out --killJvmOnFail ${kill_jvm} --checkpointInterval 1000 \ + --maxAttempts ${max_attempts} --parallelism ${parallelism} --stateBackend ${backend} \ + --incrementalCheckpoints ${incremental} + + check_logs ${parallelism} ${max_attempts} + cleanup_after_test +} + +## MAIN +trap cleanup_after_test_and_exit_fail EXIT +run_local_recovery_test 4 3 "file" "false" "false" +run_local_recovery_test 4 3 "file" "false" "true" +run_local_recovery_test 4 10 "rocks" "false" "false" +run_local_recovery_test 4 10 "rocks" "true" "false" +run_local_recovery_test 4 10 "rocks" "false" "true" +run_local_recovery_test 4 10 "rocks" "true" "true" +trap - EXIT +exit 0 http://git-wip-us.apache.org/repos/asf/flink/blob/da5b6d7a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java index 6826fbd..095dc86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java @@ -142,7 +142,7 @@ public class TaskExecutorLocalStateStoresManager { LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(localRecoveryEnabled, directoryProvider); - taskLocalStateStore = (localRecoveryMode != LocalRecoveryConfig.LocalRecoveryMode.DISABLED) ? + taskLocalStateStore = localRecoveryConfig.isLocalRecoveryEnabled() ? // Real store implementation if local recovery is enabled new TaskLocalStateStoreImpl( http://git-wip-us.apache.org/repos/asf/flink/blob/da5b6d7a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java index 0f5b0e0..29746dc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java @@ -105,6 +105,8 @@ public class BackendRestorerProcedure< ++alternativeIdx; + // IMPORTANT: please be careful when modifying the log statements because they are used for validation in + // the automatic end-to-end tests. Those tests might fail if they are not aligned with the log message! if (restoreState.isEmpty()) { LOG.debug("Creating {} with empty state.", logDescription); } else {