[tests] Add integration test for restart recovery

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d80711d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d80711d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d80711d

Branch: refs/heads/master
Commit: 7d80711d366c5a2127e50051e6c0d5b347b638c6
Parents: a110449
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 16 21:40:06 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 19 18:54:14 2015 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  60 -----
 .../apache/flink/runtime/instance/Instance.java |   4 +-
 .../flink/runtime/instance/InstanceManager.java |  42 +---
 .../flink/runtime/jobmanager/JobManager.scala   |   6 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   2 +-
 .../runtime/instance/InstanceManagerTest.java   |  10 +-
 .../runtime/testingUtils/TestingUtils.scala     |   1 -
 .../test/recovery/SimpleRecoveryITCase.java     | 252 +++++++++++++++++++
 .../TaskManagerFailureRecoveryITCase.java       | 184 ++++++++++++++
 9 files changed, 456 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 42a3c9a..767648a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -56,12 +56,6 @@ public final class ConfigConstants {
        public static final String JOB_MANAGER_IPC_PORT_KEY = 
"jobmanager.rpc.port";
 
        /**
-        * The config parameter defining the number of seconds that a task 
manager heartbeat may be missing before it is
-        * marked as failed.
-        */
-       public static final String JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY = 
"jobmanager.max-heartbeat-delay-before-failure.msecs";
-
-       /**
         * The config parameter defining the storage directory to be used by 
the blob server.
         */
        public static final String BLOB_STORAGE_DIRECTORY_KEY = 
"blob.storage.directory";
@@ -136,34 +130,6 @@ public final class ConfigConstants {
        public static final String TASK_MANAGER_NUM_TASK_SLOTS = 
"taskmanager.numberOfTaskSlots";
 
        /**
-        * The number of incoming network IO threads (e.g. incoming connection 
threads used in NettyConnectionManager
-        * for the ServerBootstrap.)
-        */
-       public static final String TASK_MANAGER_NET_NUM_IN_THREADS_KEY = 
"taskmanager.net.numInThreads";
-
-       /**
-        * The number of outgoing network IO threads (e.g. outgoing connection 
threads used in NettyConnectionManager for
-        * the Bootstrap.)
-        */
-       public static final String TASK_MANAGER_NET_NUM_OUT_THREADS_KEY = 
"taskmanager.net.numOutThreads";
-
-       /**
-        * The low water mark used in NettyConnectionManager for the Bootstrap.
-        */
-       public static final String TASK_MANAGER_NET_NETTY_LOW_WATER_MARK = 
"taskmanager.net.nettyLowWaterMark";
-
-       /**
-        * The high water mark used in NettyConnectionManager for the Bootstrap.
-        */
-       public static final String TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = 
"taskmanager.net.nettyHighWaterMark";
-       
-       /**
-        * Parameter for the interval in which the TaskManager sends the 
periodic heart beat messages
-        * to the JobManager (in msecs).
-        */
-       public static final String TASK_MANAGER_HEARTBEAT_INTERVAL_KEY = 
"taskmanager.heartbeat-interval";
-
-       /**
         * Flag indicating whether to start a thread, which repeatedly logs the 
memory usage of the JVM.
         */
        public static final String 
TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = 
"taskmanager.debug.memory.startLogThread";
@@ -473,32 +439,6 @@ public final class ConfigConstants {
        public static final int DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE = 
32768;
 
        /**
-        * Default number of incoming network IO threads (e.g. number of 
incoming connection threads used in
-        * NettyConnectionManager for the ServerBootstrap). If set to -1, a 
reasonable default depending on the number of
-        * cores will be picked.
-        */
-       public static final int DEFAULT_TASK_MANAGER_NET_NUM_IN_THREADS = -1;
-
-       /**
-        * Default number of outgoing network IO threads (e.g. number of 
outgoing connection threads used in
-        * NettyConnectionManager for the Bootstrap). If set to -1, a 
reasonable default depending on the number of cores
-        * will be picked.
-        */
-       public static final int DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS = -1;
-
-       /**
-        * Default low water mark used in NettyConnectionManager for the 
Bootstrap. If set to -1, NettyConnectionManager
-        * will use half of the network buffer size as the low water mark.
-        */
-       public static final int DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK = 
-1;
-
-       /**
-        * Default high water mark used in NettyConnectionManager for the 
Bootstrap. If set to -1, NettyConnectionManager
-        * will use the network buffer size as the high water mark.
-        */
-       public static final int DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK 
= -1;
-
-       /**
         * Flag indicating whether to start a thread, which repeatedly logs the 
memory usage of the JVM.
         */
        public static final boolean 
DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index a5a9263..4e08389 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -329,7 +329,7 @@ public class Instance {
        
        @Override
        public String toString() {
-               return instanceId + " @ " + (taskManager != null ? 
taskManager.path() : "ActorRef.noSender") + " - " +
-                               numberOfSlots + " slots" + " - " + hashCode();
+               return String.format("%s @ %s - %d slots - URL: %s", 
instanceId, connectionInfo.getHostname(),
+                               numberOfSlots, (taskManager != null ? 
taskManager.path() : "ActorRef.noSender"));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index b28da35..2ee41da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -27,8 +27,6 @@ import java.util.Map;
 import java.util.Set;
 
 import akka.actor.ActorRef;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,9 +55,6 @@ public class InstanceManager {
        
        /** Listeners that want to be notified about availability and 
disappearance of instances */
        private final List<InstanceListener> instanceListeners = new 
ArrayList<InstanceListener>();
-
-       /** Duration after which a task manager is considered dead if it did 
not send a heart-beat message. */
-       private final long heartbeatTimeout;
        
        /** The total number of task slots that the system has */
        private int totalNumberOfAliveTaskSlots;
@@ -72,32 +67,12 @@ public class InstanceManager {
        // 
------------------------------------------------------------------------
        
        /**
-        * Creates an instance manager, using the global configuration value 
for maximum interval between heartbeats
-        * where a task manager is still considered alive.
+        * Creates an new instance manager.
         */
        public InstanceManager() {
-               this(GlobalConfiguration.getLong(
-                               
ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
-                               
ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT));
-       }
-       
-       public InstanceManager(long heartbeatTimeout) {
-               this(heartbeatTimeout, heartbeatTimeout);
-       }
-       
-       public InstanceManager(long heartbeatTimeout, long cleanupInterval) {
-               if (heartbeatTimeout <= 0 || cleanupInterval <= 0) {
-                       throw new IllegalArgumentException("Heartbeat timeout 
and cleanup interval must be positive.");
-               }
-               
                this.registeredHostsById = new HashMap<InstanceID, Instance>();
                this.registeredHostsByConnection = new HashMap<ActorRef, 
Instance>();
                this.deadHosts = new HashSet<ActorRef>();
-               this.heartbeatTimeout = heartbeatTimeout;
-       }
-       
-       public long getHeartbeatTimeout() {
-               return heartbeatTimeout;
        }
 
        public void shutdown() {
@@ -132,14 +107,19 @@ public class InstanceManager {
 
                        if (host == null){
                                if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Received hearbeat from 
unknown TaskManager with instance ID " + instanceId.toString() + 
-                                                       " Possibly TaskManager 
was maked as dead (timed-out) earlier. " +
+                                       LOG.debug("Received heartbeat from 
unknown TaskManager with instance ID " + instanceId.toString() +
+                                                       " Possibly TaskManager 
was marked as dead (timed-out) earlier. " +
                                                        "Reporting back that 
task manager is no longer known.");
                                }
                                return false;
                        }
 
                        host.reportHeartBeat();
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Received heartbeat from TaskManager 
" + host);
+                       }
+
                        return true;
                }
        }
@@ -164,7 +144,7 @@ public class InstanceManager {
                                                " which was marked as dead 
earlier because of a heart-beat timeout.");
                        }
 
-                       InstanceID id = null;
+                       InstanceID id;
                        do {
                                id = new InstanceID();
                        } while (registeredHostsById.containsKey(id));
@@ -178,8 +158,8 @@ public class InstanceManager {
                        totalNumberOfAliveTaskSlots += numberOfSlots;
                        
                        if (LOG.isInfoEnabled()) {
-                               LOG.info(String.format("Registered TaskManager 
at %s as %s. Current number of registered hosts is %d.",
-                                               taskManager.path(), id, 
registeredHostsById.size()));
+                               LOG.info(String.format("Registered TaskManager 
at %s (%s) as %s. Current number of registered hosts is %d.",
+                                               connectionInfo.getHostname(), 
taskManager.path(), id, registeredHostsById.size()));
                        }
 
                        host.reportHeartBeat();

http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1741cdb..cb8acae 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -723,9 +723,9 @@ object JobManager {
       ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY,
       ConfigConstants.DEFAULT_EXECUTION_RETRIES)
 
-    val delayBetweenRetries = 2 * configuration.getLong(
-      ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
-      ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT)
+    val delayBetweenRetries = 2 * Duration(configuration.getString(
+      ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
+      ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)).toMillis
 
     (archiveCount, profilingEnabled, cleanupInterval, executionRetries, 
delayBetweenRetries)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 99d824b..c14ebca 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -433,7 +433,7 @@ import scala.collection.JavaConverters._
       if (jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
         profiler match {
           case Some(profilerActorRef) => profilerActorRef ! MonitorTask(task)
-          case None => log.info("There is no profiling enabled for the task 
manager.")
+          case None => // no log message here - floods the log
         }
       }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index dff3dd3..b9aa674 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -65,10 +65,6 @@ public class InstanceManagerTest{
                try {
                        InstanceManager cm = new InstanceManager();
                        
-                       // catches error that some parts assumed config values 
in seconds, others in
-                       // milliseconds by verifying that the timeout is not 
larger than 2 minutes.
-                       assertTrue(cm.getHeartbeatTimeout() < 2 * 60 * 1000);
-                       
                        final int dataPort = 20000;
 
                        HardwareDescription hardwareDescription = 
HardwareDescription.extractFromSystem(4096);
@@ -76,7 +72,7 @@ public class InstanceManagerTest{
                        InetAddress address = 
InetAddress.getByName("127.0.0.1");
                        
                        // register three instances
-                       InstanceConnectionInfo ici1 = new 
InstanceConnectionInfo(address, dataPort + 0);
+                       InstanceConnectionInfo ici1 = new 
InstanceConnectionInfo(address, dataPort);
                        InstanceConnectionInfo ici2 = new 
InstanceConnectionInfo(address, dataPort + 15);
                        InstanceConnectionInfo ici3 = new 
InstanceConnectionInfo(address, dataPort + 30);
 
@@ -121,7 +117,7 @@ public class InstanceManagerTest{
 
                        HardwareDescription resources = 
HardwareDescription.extractFromSystem(4096);
                        InetAddress address = 
InetAddress.getByName("127.0.0.1");
-                       InstanceConnectionInfo ici = new 
InstanceConnectionInfo(address, dataPort + 0);
+                       InstanceConnectionInfo ici = new 
InstanceConnectionInfo(address, dataPort);
 
                        JavaTestKit probe = new JavaTestKit(system);
                        InstanceID i = cm.registerTaskManager(probe.getRef(), 
ici, resources, 1);
@@ -157,7 +153,7 @@ public class InstanceManagerTest{
                        InetAddress address = 
InetAddress.getByName("127.0.0.1");
                        
                        // register three instances
-                       InstanceConnectionInfo ici1 = new 
InstanceConnectionInfo(address, dataPort + 0);
+                       InstanceConnectionInfo ici1 = new 
InstanceConnectionInfo(address, dataPort);
                        InstanceConnectionInfo ici2 = new 
InstanceConnectionInfo(address, dataPort + 1);
                        InstanceConnectionInfo ici3 = new 
InstanceConnectionInfo(address, dataPort + 2);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 4416ba6..c26b255 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -101,7 +101,6 @@ object TestingUtils {
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTMs)
-    
config.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 
1000)
     config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
     new TestingCluster(config)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
new file mode 100644
index 0000000..911edb3
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class SimpleRecoveryITCase {
+
+
+       private static ForkableFlinkMiniCluster cluster;
+
+       @BeforeClass
+       public static void setupCluster() {
+               Configuration config = new Configuration();
+               
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
2);
+               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
2);
+               config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 
s");
+
+               cluster = new ForkableFlinkMiniCluster(config, false);
+       }
+
+       @AfterClass
+       public static void teardownCluster() {
+               try {
+                       cluster.stop();
+               }
+               catch (Throwable t) {
+                       System.err.println("Error stopping cluster on 
shutdown");
+                       t.printStackTrace();
+                       fail("Cluster shutdown caused an exception: " + 
t.getMessage());
+               }
+       }
+
+       @Test
+       public void testFailedRunThenSuccessfulRun() {
+
+               FailOnceMapper.failuresBeforeSuccess = 1;
+
+               try {
+                       List<Long> resultCollection = new ArrayList<Long>();
+
+                       // attempt 1
+                       {
+                               ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
+                                               "localhost", 
cluster.getJobManagerRPCPort());
+
+                               env.setDegreeOfParallelism(4);
+                               env.setNumberOfExecutionRetries(0);
+
+                               env.generateSequence(1, 10)
+                                               .map(new FailOnceMapper<Long>())
+                                               .reduce(new 
ReduceFunction<Long>() {
+                                                       @Override
+                                                       public Long reduce(Long 
value1, Long value2) {
+                                                               return value1 + 
value2;
+                                                       }
+                                               })
+                                               .output(new 
LocalCollectionOutputFormat<Long>(resultCollection));
+
+                               try {
+                                       env.execute();
+                                       fail("The program should have failed");
+                               }
+                               catch (ProgramInvocationException e) {
+                                       // expected
+                               }
+                       }
+
+                       // attempt 2
+                       {
+                               ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
+                                               "localhost", 
cluster.getJobManagerRPCPort());
+
+                               env.setDegreeOfParallelism(4);
+                               env.setNumberOfExecutionRetries(0);
+
+                               env.generateSequence(1, 10)
+                                               .map(new FailOnceMapper<Long>())
+                                               .reduce(new 
ReduceFunction<Long>() {
+                                                       @Override
+                                                       public Long reduce(Long 
value1, Long value2) {
+                                                               return value1 + 
value2;
+                                                       }
+                                               })
+                                               .output(new 
LocalCollectionOutputFormat<Long>(resultCollection));
+
+                               try {
+                                       JobExecutionResult result = 
env.execute();
+                                       assertTrue(result.getNetRuntime() >= 0);
+                                       
assertNotNull(result.getAllAccumulatorResults());
+                                       
assertTrue(result.getAllAccumulatorResults().isEmpty());
+                               }
+                               catch (JobExecutionException e) {
+                                       fail("The program should have succeeded 
on the second run");
+                               }
+
+                               long sum = 0;
+                               for (long l : resultCollection) {
+                                       sum += l;
+                               }
+                               assertEquals(55, sum);
+                       }
+
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testRestart() {
+
+               FailOnceMapper.failuresBeforeSuccess = 1;
+
+               try {
+                       List<Long> resultCollection = new ArrayList<Long>();
+
+                       ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
+                                       "localhost", 
cluster.getJobManagerRPCPort());
+
+                       env.setDegreeOfParallelism(4);
+                       env.setNumberOfExecutionRetries(1);
+
+                       env.generateSequence(1, 10)
+                                       .map(new FailOnceMapper<Long>())
+                                       .reduce(new ReduceFunction<Long>() {
+                                               @Override
+                                               public Long reduce(Long value1, 
Long value2) {
+                                                       return value1 + value2;
+                                               }
+                                       })
+                                       .output(new 
LocalCollectionOutputFormat<Long>(resultCollection));
+
+                       try {
+                               JobExecutionResult result = env.execute();
+                               assertTrue(result.getNetRuntime() >= 0);
+                               
assertNotNull(result.getAllAccumulatorResults());
+                               
assertTrue(result.getAllAccumulatorResults().isEmpty());
+                       }
+                       catch (JobExecutionException e) {
+                               fail("The program should have succeeded on the 
second run");
+                       }
+
+                       long sum = 0;
+                       for (long l : resultCollection) {
+                               sum += l;
+                       }
+                       assertEquals(55, sum);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testRestartMultipleTimes() {
+
+               FailOnceMapper.failuresBeforeSuccess = 3;
+
+               try {
+                       List<Long> resultCollection = new ArrayList<Long>();
+
+                       ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
+                                       "localhost", 
cluster.getJobManagerRPCPort());
+
+                       env.setDegreeOfParallelism(4);
+                       env.setNumberOfExecutionRetries(3);
+
+                       env.generateSequence(1, 10)
+                                       .map(new FailOnceMapper<Long>())
+                                       .reduce(new ReduceFunction<Long>() {
+                                               @Override
+                                               public Long reduce(Long value1, 
Long value2) {
+                                                       return value1 + value2;
+                                               }
+                                       })
+                                       .output(new 
LocalCollectionOutputFormat<Long>(resultCollection));
+
+                       try {
+                               JobExecutionResult result = env.execute();
+                               assertTrue(result.getNetRuntime() >= 0);
+                               
assertNotNull(result.getAllAccumulatorResults());
+                               
assertTrue(result.getAllAccumulatorResults().isEmpty());
+                       }
+                       catch (JobExecutionException e) {
+                               fail("The program should have succeeded on the 
second run");
+                       }
+
+                       long sum = 0;
+                       for (long l : resultCollection) {
+                               sum += l;
+                       }
+                       assertEquals(55, sum);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       // 
------------------------------------------------------------------------------------
+
+       private static class FailOnceMapper<T> extends RichMapFunction<T, T> {
+
+               private static int failuresBeforeSuccess = 0;
+
+               @Override
+               public T map(T value) throws Exception {
+                       if (failuresBeforeSuccess > 0 && 
getRuntimeContext().getIndexOfThisSubtask() == 1) {
+                               failuresBeforeSuccess--;
+                               throw new Exception("Test Failure");
+                       }
+
+                       return value;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
new file mode 100644
index 0000000..85856ba
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.pattern.Patterns;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+
+public class TaskManagerFailureRecoveryITCase {
+
+       @Test
+       public void testRestartWithFailingTaskManager() {
+
+               final int PARALLELISM = 4;
+
+               ForkableFlinkMiniCluster cluster = null;
+               ActorSystem additionalSystem = null;
+
+               try {
+                       Configuration config = new Configuration();
+                       
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
2);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+
+                       
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
+                       
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+                       config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 
2);
+
+                       cluster = new ForkableFlinkMiniCluster(config, false);
+
+                       // for the result
+                       List<Long> resultCollection = new ArrayList<Long>();
+
+                       final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
+                                       "localhost", 
cluster.getJobManagerRPCPort());
+
+                       env.setDegreeOfParallelism(PARALLELISM);
+                       env.setNumberOfExecutionRetries(1);
+
+                       env.generateSequence(1, 10)
+                                       .map(new FailingMapper<Long>())
+                                       .reduce(new ReduceFunction<Long>() {
+                                               @Override
+                                               public Long reduce(Long value1, 
Long value2) {
+                                                       return value1 + value2;
+                                               }
+                                       })
+                                       .output(new 
LocalCollectionOutputFormat<Long>(resultCollection));
+
+
+                       // simple reference (atomic does not matter) to pass 
back an exception from the trigger thread
+                       final AtomicReference<Throwable> ref = new 
AtomicReference<Throwable>();
+
+                       // trigger the execution from a separate thread, so we 
are available to temper with the
+                       // cluster during the execution
+                       Thread trigger = new Thread("program trigger") {
+                               @Override
+                               public void run() {
+                                       try {
+                                               env.execute();
+                                       }
+                                       catch (Throwable t) {
+                                               ref.set(t);
+                                       }
+                               }
+                       };
+                       trigger.setDaemon(true);
+                       trigger.start();
+
+                       // block until all the mappers are actually deployed
+                       // the mappers in turn are waiting
+                       for (int i = 0; i < PARALLELISM; i++) {
+                               FailingMapper.TASK_TO_COORD_QUEUE.take();
+                       }
+
+                       // bring up one more task manager and wait for it to 
appear
+                       {
+                               additionalSystem = 
cluster.startTaskManagerActorSystem(2);
+                               ActorRef additionalTaskManager = 
cluster.startTaskManager(2, additionalSystem);
+                               Object message = 
TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage();
+                               Future<Object> future = 
Patterns.ask(additionalTaskManager, message, 30000);
+
+                               try {
+                                       Await.result(future, new 
FiniteDuration(30000, TimeUnit.MILLISECONDS));
+                               }
+                               catch (TimeoutException e) {
+                                       fail ("The additional TaskManager did 
not come up within 30 seconds");
+                               }
+                       }
+
+                       // kill the two other TaskManagers
+                       for (ActorRef tm : cluster.getTaskManagersAsJava()) {
+                               tm.tell(PoisonPill.getInstance(), null);
+                       }
+
+                       // wait for the next set of mappers (the recovery ones) 
to come online
+                       for (int i = 0; i < PARALLELISM; i++) {
+                               FailingMapper.TASK_TO_COORD_QUEUE.take();
+                       }
+
+                       // tell the mappers that they may continue this time
+                       for (int i = 0; i < PARALLELISM; i++) {
+                               FailingMapper.COORD_TO_TASK_QUEUE.add(new 
Object());
+                       }
+
+                       // wait for the program to finish
+                       trigger.join();
+                       if (ref.get() != null) {
+                               Throwable t = ref.get();
+                               t.printStackTrace();
+                               fail("Program execution caused an exception: " 
+ t.getMessage());
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (additionalSystem != null) {
+                               additionalSystem.shutdown();
+                       }
+                       if (cluster != null) {
+                               cluster.stop();
+                       }
+               }
+       }
+
+       private static class FailingMapper<T> extends RichMapFunction<T, T> {
+
+               private static final BlockingQueue<Object> TASK_TO_COORD_QUEUE 
= new LinkedBlockingQueue<Object>();
+
+               private static final BlockingQueue<Object> COORD_TO_TASK_QUEUE 
= new LinkedBlockingQueue<Object>();
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       TASK_TO_COORD_QUEUE.add(new Object());
+                       COORD_TO_TASK_QUEUE.take();
+               }
+
+               @Override
+               public T map(T value) throws Exception {
+                       return value;
+               }
+       }
+}

Reply via email to