[FLINK-8910][e2e] Automated test for local recovery (including sticky 
allocation)

This closes #5676.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/489e4281
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/489e4281
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/489e4281

Branch: refs/heads/master
Commit: 489e42811157a9b2575f259f7cda2a2ee680d008
Parents: edece9c
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Tue Mar 6 10:35:44 2018 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu May 17 10:03:04 2018 +0200

----------------------------------------------------------------------
 .../pom.xml                                     |  96 ++++
 ...StickyAllocationAndLocalRecoveryTestJob.java | 478 +++++++++++++++++++
 .../src/main/resources/log4j-test.properties    |  27 ++
 flink-end-to-end-tests/pom.xml                  |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh     |   8 +
 flink-end-to-end-tests/test-scripts/common.sh   |  43 +-
 .../test_local_recovery_and_scheduling.sh       | 121 +++++
 .../TaskExecutorLocalStateStoresManager.java    |   2 +-
 .../api/operators/BackendRestorerProcedure.java |   2 +
 9 files changed, 773 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml 
b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml
new file mode 100644
index 0000000..4b966e2
--- /dev/null
+++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <artifactId>flink-end-to-end-tests</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.6-SNAPSHOT</version>
+       </parent>
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>flink-local-recovery-and-allocation-test</artifactId>
+       <name>flink-local-recovery-and-allocation-test</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-log4j12</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>log4j</groupId>
+                       <artifactId>log4j</artifactId>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <version>2.4</version>
+                               <executions>
+                                       <!-- 
StickyAllocationAndLocalRecoveryTestJob -->
+                                       <execution>
+                                               
<id>StickyAllocationAndLocalRecoveryTestJob</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>jar</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<finalName>StickyAllocationAndLocalRecoveryTestJob</finalName>
+
+                                                       <archive>
+                                                               
<manifestEntries>
+                                                                       
<program-class>org.apache.flink.streaming.tests.StickyAllocationAndLocalRecoveryTestJob</program-class>
+                                                               
</manifestEntries>
+                                                       </archive>
+
+                                                       <includes>
+                                                               
<include>org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.class</include>
+                                                               
<include>org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob$*.class</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 
b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
new file mode 100644
index 0000000..b03791e
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky allocation).
+ *
+ * <p>List of possible input parameters for this job:
+ * <ul>
+ *     <li>checkpointDir: the checkpoint directory, required.</li>
+ *     <li>parallelism: the parallelism of the job, default 1.</li>
+ *     <li>maxParallelism: the maximum parallelism of the job, default 1.</li>
+ *     <li>checkpointInterval: the checkpointing interval in milliseconds, 
default 1000.</li>
+ *     <li>restartDelay: the delay of the fixed delay restart strategy, 
default 0.</li>
+ *     <li>externalizedCheckpoints: flag to activate externalized checkpoints, 
default <code>false</code>.</li>
+ *     <li>stateBackend: choice for state backend between <code>file</code> 
and <code>rocks</code>, default <code>file</code>.</li>
+ *     <li>killJvmOnFail: flag that determines whether or not an artificial 
failure induced by the test kills the JVM or not.</li>
+ *     <li>asyncCheckpoints: flag for async checkpoints with file state 
backend, default <code>true</code>.</li>
+ *     <li>incrementalCheckpoints: flag for incremental checkpoint with rocks 
state backend, default <code>false</code>.</li>
+ *     <li>delay: sleep delay to throttle down the production of the source, 
default 0.</li>
+ *     <li>maxAttempts: the maximum number of run attempts, before the job 
finishes with success, default 3.</li>
+ *     <li>valueSize: size of the artificial value for each key in bytes, 
default 10.</li>
+ * </ul>
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+       public static void main(String[] args) throws Exception {
+
+               final ParameterTool pt = ParameterTool.fromArgs(args);
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               env.setParallelism(pt.getInt("parallelism", 1));
+               env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+               env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+               
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+               if (pt.getBoolean("externalizedCheckpoints", false)) {
+                       
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+               }
+
+               String stateBackend = pt.get("stateBackend", "file");
+               String checkpointDir = pt.getRequired("checkpointDir");
+
+               boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+               if ("file".equals(stateBackend)) {
+                       boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", true);
+                       env.setStateBackend(new FsStateBackend(checkpointDir, 
asyncCheckpoints));
+               } else if ("rocks".equals(stateBackend)) {
+                       boolean incrementalCheckpoints = 
pt.getBoolean("incrementalCheckpoints", false);
+                       env.setStateBackend(new 
RocksDBStateBackend(checkpointDir, incrementalCheckpoints));
+               } else {
+                       throw new IllegalArgumentException("Unknown backend: " 
+ stateBackend);
+               }
+
+               // make parameters available in the web interface
+               env.getConfig().setGlobalJobParameters(pt);
+
+               // delay to throttle down the production of the source
+               long delay = pt.getLong("delay", 0L);
+
+               // the maximum number of attempts, before the job finishes with 
success
+               int maxAttempts = pt.getInt("maxAttempts", 3);
+
+               // size of one artificial value
+               int valueSize = pt.getInt("valueSize", 10);
+
+               env.addSource(new RandomLongSource(maxAttempts, delay))
+                       .keyBy((KeySelector<Long, Long>) aLong -> aLong)
+                       .flatMap(new StateCreatingFlatMap(valueSize, 
killJvmOnFail))
+                       .addSink(new PrintSinkFunction<>());
+
+               env.execute("Sticky Allocation And Local Recovery Test");
+       }
+
+       /**
+        * Source function that produces a long sequence.
+        */
+       private static final class RandomLongSource extends 
RichParallelSourceFunction<Long> implements CheckpointedFunction {
+
+               private static final long serialVersionUID = 1L;
+
+               /**
+                * Generator delay between two events.
+                */
+               final long delay;
+
+               /**
+                * Maximum restarts before shutting down this source.
+                */
+               final int maxAttempts;
+
+               /**
+                * State that holds the current key for recovery.
+                */
+               transient ListState<Long> sourceCurrentKeyState;
+
+               /**
+                * Generator's current key.
+                */
+               long currentKey;
+
+               /**
+                * Generator runs while this is true.
+                */
+               volatile boolean running;
+
+               RandomLongSource(int maxAttempts, long delay) {
+                       this.delay = delay;
+                       this.maxAttempts = maxAttempts;
+                       this.running = true;
+               }
+
+               @Override
+               public void run(SourceContext<Long> sourceContext) throws 
Exception {
+
+                       int numberOfParallelSubtasks = 
getRuntimeContext().getNumberOfParallelSubtasks();
+                       int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
+
+                       // the source emits one final event and shuts down once 
we have reached max attempts.
+                       if (getRuntimeContext().getAttemptNumber() > 
maxAttempts) {
+                               synchronized 
(sourceContext.getCheckpointLock()) {
+                                       sourceContext.collect(Long.MAX_VALUE - 
subtaskIdx);
+                               }
+                               return;
+                       }
+
+                       while (running) {
+
+                               synchronized 
(sourceContext.getCheckpointLock()) {
+                                       sourceContext.collect(currentKey);
+                                       currentKey += numberOfParallelSubtasks;
+                               }
+
+                               if (delay > 0) {
+                                       Thread.sleep(delay);
+                               }
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+
+               @Override
+               public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
+                       sourceCurrentKeyState.clear();
+                       sourceCurrentKeyState.add(currentKey);
+               }
+
+               @Override
+               public void initializeState(FunctionInitializationContext 
context) throws Exception {
+
+                       ListStateDescriptor<Long> currentKeyDescriptor = new 
ListStateDescriptor<>("currentKey", Long.class);
+                       sourceCurrentKeyState = 
context.getOperatorStateStore().getListState(currentKeyDescriptor);
+
+                       currentKey = 
getRuntimeContext().getIndexOfThisSubtask();
+                       Iterable<Long> iterable = sourceCurrentKeyState.get();
+                       if (iterable != null) {
+                               Iterator<Long> iterator = iterable.iterator();
+                               if (iterator.hasNext()) {
+                                       currentKey = iterator.next();
+                                       
Preconditions.checkState(!iterator.hasNext());
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Stateful map function. Failure creation and checks happen here.
+        */
+       private static final class StateCreatingFlatMap
+               extends RichFlatMapFunction<Long, String> implements 
CheckpointedFunction, CheckpointListener {
+
+               private static final long serialVersionUID = 1L;
+
+               /**
+                * User configured size of the generated artificial values in 
the keyed state.
+                */
+               final int valueSize;
+
+               /**
+                * Holds the user configuration if the artificial test failure 
is killing the JVM.
+                */
+               final boolean killTaskOnFailure;
+
+               /**
+                * This state is used to create artificial keyed state in the 
backend.
+                */
+               transient ValueState<String> valueState;
+
+               /**
+                * This state is used to persist the schedulingAndFailureInfo 
to state.
+                */
+               transient ListState<MapperSchedulingAndFailureInfo> 
schedulingAndFailureState;
+
+               /**
+                * This contains the current scheduling and failure meta data.
+                */
+               transient MapperSchedulingAndFailureInfo 
currentSchedulingAndFailureInfo;
+
+               /**
+                * Message to indicate that recovery detected a failure with 
sticky allocation.
+                */
+               transient volatile String allocationFailureMessage;
+
+               /**
+                * If this flag is true, the next invocation of the map 
function introduces a test failure.
+                */
+               transient volatile boolean failTask;
+
+               StateCreatingFlatMap(int valueSize, boolean killTaskOnFailure) {
+                       this.valueSize = valueSize;
+                       this.failTask = false;
+                       this.killTaskOnFailure = killTaskOnFailure;
+                       this.allocationFailureMessage = null;
+               }
+
+               @Override
+               public void flatMap(Long key, Collector<String> collector) 
throws IOException {
+
+                       if (allocationFailureMessage != null) {
+                               // Report the failure downstream, so that we 
can get the message from the output.
+                               collector.collect(allocationFailureMessage);
+                               allocationFailureMessage = null;
+                       }
+
+                       if (failTask) {
+                               // we fail the task, either by killing the JVM 
hard, or by throwing a user code exception.
+                               if (killTaskOnFailure) {
+                                       Runtime.getRuntime().halt(-1);
+                               } else {
+                                       throw new RuntimeException("Artificial 
user code exception.");
+                               }
+                       }
+
+                       // sanity check
+                       if (null != valueState.value()) {
+                               throw new IllegalStateException("This should 
never happen, keys are generated monotonously.");
+                       }
+
+                       // store artificial data to blow up the state
+                       valueState.update(RandomStringUtils.random(valueSize, 
true, true));
+               }
+
+               @Override
+               public void snapshotState(FunctionSnapshotContext 
functionSnapshotContext) {
+               }
+
+               @Override
+               public void initializeState(FunctionInitializationContext 
functionInitializationContext) throws Exception {
+                       ValueStateDescriptor<String> stateDescriptor =
+                               new ValueStateDescriptor<>("state", 
String.class);
+                       valueState = 
functionInitializationContext.getKeyedStateStore().getState(stateDescriptor);
+
+                       ListStateDescriptor<MapperSchedulingAndFailureInfo> 
mapperInfoStateDescriptor =
+                               new ListStateDescriptor<>("mapperState", 
MapperSchedulingAndFailureInfo.class);
+                       schedulingAndFailureState =
+                               
functionInitializationContext.getOperatorStateStore().getUnionListState(mapperInfoStateDescriptor);
+
+                       StreamingRuntimeContext runtimeContext = 
(StreamingRuntimeContext) getRuntimeContext();
+                       String allocationID = 
runtimeContext.getAllocationIDAsString();
+
+                       final int thisJvmPid = getJvmPid();
+                       final Set<Integer> killedJvmPids = new HashSet<>();
+
+                       // here we check if the sticky scheduling worked as 
expected
+                       if (functionInitializationContext.isRestored()) {
+                               Iterable<MapperSchedulingAndFailureInfo> 
iterable = schedulingAndFailureState.get();
+                               String taskNameWithSubtasks = 
runtimeContext.getTaskNameWithSubtasks();
+
+                               MapperSchedulingAndFailureInfo infoForThisTask 
= null;
+                               List<MapperSchedulingAndFailureInfo> 
completeInfo = new ArrayList<>();
+                               if (iterable != null) {
+                                       for (MapperSchedulingAndFailureInfo 
testInfo : iterable) {
+
+                                               completeInfo.add(testInfo);
+
+                                               if 
(taskNameWithSubtasks.equals(testInfo.taskNameWithSubtask)) {
+                                                       infoForThisTask = 
testInfo;
+                                               }
+
+                                               if (testInfo.killedJvm) {
+                                                       
killedJvmPids.add(testInfo.jvmPid);
+                                               }
+                                       }
+                               }
+
+                               Preconditions.checkNotNull(infoForThisTask, 
"Expected to find info here.");
+
+                               if 
(!isScheduledToCorrectAllocation(infoForThisTask, allocationID, killedJvmPids)) 
{
+                                       allocationFailureMessage = 
String.format(
+                                               "Sticky allocation test failed: 
Subtask %s in attempt %d was rescheduled from allocation %s " +
+                                                       "on JVM with PID %d to 
unexpected allocation %s on JVM with PID %d.\n" +
+                                                       "Complete information 
from before the crash: %s.",
+                                               
runtimeContext.getTaskNameWithSubtasks(),
+                                               
runtimeContext.getAttemptNumber(),
+                                               infoForThisTask.allocationId,
+                                               infoForThisTask.jvmPid,
+                                               allocationID,
+                                               thisJvmPid,
+                                               completeInfo);
+                               }
+                       }
+
+                       // We determine which of the subtasks will produce the 
artificial failure
+                       boolean failingTask = shouldTaskFailForThisAttempt();
+
+                       // We take note of all the meta info that we require to 
check sticky scheduling in the next re-attempt
+                       this.currentSchedulingAndFailureInfo = new 
MapperSchedulingAndFailureInfo(
+                               failingTask,
+                               failingTask && killTaskOnFailure,
+                               thisJvmPid,
+                               runtimeContext.getTaskNameWithSubtasks(),
+                               allocationID);
+
+                       schedulingAndFailureState.clear();
+                       
schedulingAndFailureState.add(currentSchedulingAndFailureInfo);
+               }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) {
+                       // we can only fail the task after at least one 
checkpoint is completed to record progress.
+                       failTask = currentSchedulingAndFailureInfo.failingTask;
+               }
+
+               private boolean shouldTaskFailForThisAttempt() {
+                       RuntimeContext runtimeContext = getRuntimeContext();
+                       int numSubtasks = 
runtimeContext.getNumberOfParallelSubtasks();
+                       int subtaskIdx = runtimeContext.getIndexOfThisSubtask();
+                       int attempt = runtimeContext.getAttemptNumber();
+                       return (attempt % numSubtasks) == subtaskIdx;
+               }
+
+               private boolean isScheduledToCorrectAllocation(
+                       MapperSchedulingAndFailureInfo infoForThisTask,
+                       String allocationID,
+                       Set<Integer> killedJvmPids) {
+
+                       return 
(infoForThisTask.allocationId.equals(allocationID)
+                               || 
killedJvmPids.contains(infoForThisTask.jvmPid));
+               }
+       }
+
+       /**
+        * This code is copied from Stack Overflow.
+        *
+        * <p><a 
href="https://stackoverflow.com/questions/35842";>https://stackoverflow.com/questions/35842</a>,
 answer
+        * <a 
href="https://stackoverflow.com/a/12066696/9193881";>https://stackoverflow.com/a/12066696/9193881</a>
+        *
+        * <p>Author: <a 
href="https://stackoverflow.com/users/446591/brad-mace";>Brad Mace</a>)
+        */
+       private static int getJvmPid() throws Exception {
+               java.lang.management.RuntimeMXBean runtime =
+                       
java.lang.management.ManagementFactory.getRuntimeMXBean();
+               java.lang.reflect.Field jvm = 
runtime.getClass().getDeclaredField("jvm");
+               jvm.setAccessible(true);
+               sun.management.VMManagement mgmt =
+                       (sun.management.VMManagement) jvm.get(runtime);
+               java.lang.reflect.Method pidMethod =
+                       mgmt.getClass().getDeclaredMethod("getProcessId");
+               pidMethod.setAccessible(true);
+
+               return (int) (Integer) pidMethod.invoke(mgmt);
+       }
+
+       /**
+        * Records the information required to check sticky scheduling after a 
restart.
+        */
+       public static class MapperSchedulingAndFailureInfo implements 
Serializable {
+
+               private static final long serialVersionUID = 1L;
+
+               /**
+                * True iff this task inflicts a test failure.
+                */
+               final boolean failingTask;
+
+               /**
+                * True iff this task kills its JVM.
+                */
+               final boolean killedJvm;
+
+               /**
+                * PID of the task JVM.
+                */
+               final int jvmPid;
+
+               /**
+                * Name and subtask index of the task.
+                */
+               final String taskNameWithSubtask;
+
+               /**
+                * The current allocation id of this task.
+                */
+               final String allocationId;
+
+               MapperSchedulingAndFailureInfo(
+                       boolean failingTask,
+                       boolean killedJvm,
+                       int jvmPid,
+                       String taskNameWithSubtask,
+                       String allocationId) {
+
+                       this.failingTask = failingTask;
+                       this.killedJvm = killedJvm;
+                       this.jvmPid = jvmPid;
+                       this.taskNameWithSubtask = taskNameWithSubtask;
+                       this.allocationId = allocationId;
+               }
+
+               @Override
+               public String toString() {
+                       return "MapperTestInfo{" +
+                               "failingTask=" + failingTask +
+                               ", killedJvm=" + killedJvm +
+                               ", jvmPid=" + jvmPid +
+                               ", taskNameWithSubtask='" + taskNameWithSubtask 
+ '\'' +
+                               ", allocationId='" + allocationId + '\'' +
+                               '}';
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties
 
b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties
new file mode 100644
index 0000000..37c65e9
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=INFO, testlogger
+
+# testlogger is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index dadb46f..367e120 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -43,6 +43,7 @@ under the License.
                <module>flink-distributed-cache-via-blob-test</module>
                <module>flink-high-parallelism-iterations-test</module>
                <module>flink-stream-stateful-job-upgrade-test</module>
+               <module>flink-local-recovery-and-allocation-test</module>
        </modules>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index d4309c1..bd91bb2 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -132,5 +132,13 @@ if [ $EXIT_CODE == 0 ]; then
   EXIT_CODE=$?
 fi
 
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==============================================================================\n"
+  printf "Running local recovery and sticky scheduling nightly end-to-end 
test\n"
+  printf 
"==============================================================================\n"
+  $END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh
+  EXIT_CODE=$?
+fi
+
 # Exit code for Travis build success/failure
 exit $EXIT_CODE

http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index ec963c5..56a5d27 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -320,6 +320,39 @@ function s3_delete {
     https://${bucket}.s3.amazonaws.com/${s3_file}
 }
 
+# This function starts the given number of task managers and monitors their 
processes. If a task manager process goes
+# away a replacement is started.
+function tm_watchdog {
+  local expectedTm=$1
+  while true;
+  do
+    runningTm=`jps | grep -Eo 'TaskManagerRunner|TaskManager' | wc -l`;
+    count=$((expectedTm-runningTm))
+    for (( c=0; c<count; c++ ))
+    do
+      $FLINK_DIR/bin/taskmanager.sh start > /dev/null
+    done
+    sleep 5;
+  done
+}
+
+# Kills all job manager.
+function jm_kill_all {
+  kill_all 'StandaloneSessionClusterEntrypoint'
+}
+
+# Kills all task manager.
+function tm_kill_all {
+  kill_all 'TaskManagerRunner|TaskManager'
+}
+
+# Kills all processes that match the given name.
+function kill_all {
+  local pid=`jps | grep -E "${1}" | cut -d " " -f 1`
+  kill ${pid} 2> /dev/null
+  wait ${pid} 2> /dev/null
+}
+
 function kill_random_taskmanager {
   KILL_TM=$(jps | grep "TaskManager" | sort -R | head -n 1 | awk '{print $1}')
   kill -9 "$KILL_TM"
@@ -432,12 +465,14 @@ function run_test {
     return "${exit_code}"
 }
 
-# make sure to clean up even in case of failures
+# Shuts down the cluster and cleans up all temporary folders and files. Make 
sure to clean up even in case of failures.
 function cleanup {
   stop_cluster
-  check_all_pass
-  rm -rf $TEST_DATA_DIR
-  rm -f $FLINK_DIR/log/*
+  tm_kill_all
+  jm_kill_all
+  rm -rf $TEST_DATA_DIR 2> /dev/null
   revert_default_config
+  check_all_pass
+  rm -rf $FLINK_DIR/log/* 2> /dev/null
 }
 trap cleanup EXIT

http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh 
b/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
new file mode 100755
index 0000000..98ef01f
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh
@@ -0,0 +1,121 @@
+#!/usr/bin/env bash
+
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+# This function checks the logs for entries that indicate problems with local 
recovery
+function check_logs {
+    local parallelism=$1
+    local attempts=$2
+    (( expected_count=parallelism * (attempts + 1) ))
+
+    # Search for the log message that indicates restore problem from existing 
local state for the keyed backend.
+    local failed_local_recovery=$(grep '^.*Creating keyed state backend.* from 
alternative (2/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+    # Search for attempts to recover locally.
+    local attempt_local_recovery=$(grep '^.*Creating keyed state backend.* 
from alternative (1/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+    if [ ${failed_local_recovery} -ne 0 ]
+    then
+        PASS=""
+        echo "FAILURE: Found ${failed_local_recovery} failed attempt(s) for 
local recovery of correctly scheduled task(s)."
+    fi
+
+    if [ ${attempt_local_recovery} -eq 0 ]
+    then
+        PASS=""
+        echo "FAILURE: Found no attempt for local recovery. Configuration 
problem?"
+    fi
+}
+
+# This function does a cleanup after the test. The configuration is restored, 
the watchdog is terminated and temporary
+# files and folders are deleted.
+function cleanup_after_test {
+    # Reset the configurations
+    sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=INFO, file/' 
"$FLINK_DIR/conf/log4j.properties"
+    #
+    kill ${watchdog_pid} 2> /dev/null
+    wait ${watchdog_pid} 2> /dev/null
+    #
+    cleanup
+}
+
+# Calls the cleanup step for this tests and exits with an error.
+function cleanup_after_test_and_exit_fail {
+    cleanup_after_test
+    exit 1
+}
+
+## This function executes one run for a certain configuration
+function run_local_recovery_test {
+    local parallelism=$1
+    local max_attempts=$2
+    local backend=$3
+    local incremental=$4
+    local kill_jvm=$5
+
+    echo "Running local recovery test on ${backend} backend: incremental 
checkpoints = ${incremental}, kill JVM = ${kill_jvm}."
+    
TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-local-recovery-and-allocation-test/target/StickyAllocationAndLocalRecoveryTestJob.jar
+
+    # Backup conf and configure for HA
+    backup_config
+    create_ha_config
+
+    # Enable debug logging
+    sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=DEBUG, file/' 
"$FLINK_DIR/conf/log4j.properties"
+
+    # Enable local recovery
+    set_conf "state.backend.local-recovery" "true"
+    # Ensure that each TM only has one operator(chain)
+    set_conf "taskmanager.numberOfTaskSlots" "1"
+
+    rm $FLINK_DIR/log/* 2> /dev/null
+
+    # Start HA server
+    start_local_zk
+    start_cluster
+
+    tm_watchdog ${parallelism} &
+    watchdog_pid=$!
+
+    echo "Started TM watchdog with PID ${watchdog_pid}."
+
+    $FLINK_DIR/bin/flink run -c 
org.apache.flink.streaming.tests.StickyAllocationAndLocalRecoveryTestJob \
+    -p ${parallelism} $TEST_PROGRAM_JAR \
+    -D state.backend.local-recovery=ENABLE_FILE_BASED \
+    --checkpointDir file://$TEST_DATA_DIR/local_recovery_test/checkpoints \
+    --output $TEST_DATA_DIR/out/local_recovery_test/out --killJvmOnFail 
${kill_jvm} --checkpointInterval 1000 \
+    --maxAttempts ${max_attempts} --parallelism ${parallelism} --stateBackend 
${backend} \
+    --incrementalCheckpoints ${incremental}
+
+    check_logs ${parallelism} ${max_attempts}
+    cleanup_after_test
+}
+
+## MAIN
+trap cleanup_after_test_and_exit_fail EXIT
+run_local_recovery_test 4 3 "file" "false" "false"
+run_local_recovery_test 4 3 "file" "false" "true"
+run_local_recovery_test 4 10 "rocks" "false" "false"
+run_local_recovery_test 4 10 "rocks" "true" "false"
+run_local_recovery_test 4 10 "rocks" "false" "true"
+run_local_recovery_test 4 10 "rocks" "true" "true"
+trap - EXIT
+exit 0

http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index 6826fbd..095dc86 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -142,7 +142,7 @@ public class TaskExecutorLocalStateStoresManager {
                                LocalRecoveryConfig localRecoveryConfig =
                                        new 
LocalRecoveryConfig(localRecoveryEnabled, directoryProvider);
 
-                               taskLocalStateStore = (localRecoveryMode != 
LocalRecoveryConfig.LocalRecoveryMode.DISABLED) ?
+                               taskLocalStateStore = 
localRecoveryConfig.isLocalRecoveryEnabled() ?
 
                                                // Real store implementation if 
local recovery is enabled
                                                new TaskLocalStateStoreImpl(

http://git-wip-us.apache.org/repos/asf/flink/blob/489e4281/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
index 0f5b0e0..29746dc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
@@ -105,6 +105,8 @@ public class BackendRestorerProcedure<
 
                        ++alternativeIdx;
 
+                       // IMPORTANT: please be careful when modifying the log 
statements because they are used for validation in
+                       // the automatic end-to-end tests. Those tests might 
fail if they are not aligned with the log message!
                        if (restoreState.isEmpty()) {
                                LOG.debug("Creating {} with empty state.", 
logDescription);
                        } else {

Reply via email to