[tests] Add integration test for restart recovery
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d80711d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d80711d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d80711d Branch: refs/heads/master Commit: 7d80711d366c5a2127e50051e6c0d5b347b638c6 Parents: a110449 Author: Stephan Ewen <se...@apache.org> Authored: Mon Feb 16 21:40:06 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Feb 19 18:54:14 2015 +0100 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 60 ----- .../apache/flink/runtime/instance/Instance.java | 4 +- .../flink/runtime/instance/InstanceManager.java | 42 +--- .../flink/runtime/jobmanager/JobManager.scala | 6 +- .../flink/runtime/taskmanager/TaskManager.scala | 2 +- .../runtime/instance/InstanceManagerTest.java | 10 +- .../runtime/testingUtils/TestingUtils.scala | 1 - .../test/recovery/SimpleRecoveryITCase.java | 252 +++++++++++++++++++ .../TaskManagerFailureRecoveryITCase.java | 184 ++++++++++++++ 9 files changed, 456 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 42a3c9a..767648a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -56,12 +56,6 @@ public final class ConfigConstants { public static final String JOB_MANAGER_IPC_PORT_KEY = "jobmanager.rpc.port"; /** - * The config parameter defining the number of seconds that a task manager heartbeat may be missing before it is - * marked as failed. - */ - public static final String JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY = "jobmanager.max-heartbeat-delay-before-failure.msecs"; - - /** * The config parameter defining the storage directory to be used by the blob server. */ public static final String BLOB_STORAGE_DIRECTORY_KEY = "blob.storage.directory"; @@ -136,34 +130,6 @@ public final class ConfigConstants { public static final String TASK_MANAGER_NUM_TASK_SLOTS = "taskmanager.numberOfTaskSlots"; /** - * The number of incoming network IO threads (e.g. incoming connection threads used in NettyConnectionManager - * for the ServerBootstrap.) - */ - public static final String TASK_MANAGER_NET_NUM_IN_THREADS_KEY = "taskmanager.net.numInThreads"; - - /** - * The number of outgoing network IO threads (e.g. outgoing connection threads used in NettyConnectionManager for - * the Bootstrap.) - */ - public static final String TASK_MANAGER_NET_NUM_OUT_THREADS_KEY = "taskmanager.net.numOutThreads"; - - /** - * The low water mark used in NettyConnectionManager for the Bootstrap. - */ - public static final String TASK_MANAGER_NET_NETTY_LOW_WATER_MARK = "taskmanager.net.nettyLowWaterMark"; - - /** - * The high water mark used in NettyConnectionManager for the Bootstrap. - */ - public static final String TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = "taskmanager.net.nettyHighWaterMark"; - - /** - * Parameter for the interval in which the TaskManager sends the periodic heart beat messages - * to the JobManager (in msecs). - */ - public static final String TASK_MANAGER_HEARTBEAT_INTERVAL_KEY = "taskmanager.heartbeat-interval"; - - /** * Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM. */ public static final String TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = "taskmanager.debug.memory.startLogThread"; @@ -473,32 +439,6 @@ public final class ConfigConstants { public static final int DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE = 32768; /** - * Default number of incoming network IO threads (e.g. number of incoming connection threads used in - * NettyConnectionManager for the ServerBootstrap). If set to -1, a reasonable default depending on the number of - * cores will be picked. - */ - public static final int DEFAULT_TASK_MANAGER_NET_NUM_IN_THREADS = -1; - - /** - * Default number of outgoing network IO threads (e.g. number of outgoing connection threads used in - * NettyConnectionManager for the Bootstrap). If set to -1, a reasonable default depending on the number of cores - * will be picked. - */ - public static final int DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS = -1; - - /** - * Default low water mark used in NettyConnectionManager for the Bootstrap. If set to -1, NettyConnectionManager - * will use half of the network buffer size as the low water mark. - */ - public static final int DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK = -1; - - /** - * Default high water mark used in NettyConnectionManager for the Bootstrap. If set to -1, NettyConnectionManager - * will use the network buffer size as the high water mark. - */ - public static final int DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = -1; - - /** * Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM. */ public static final boolean DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = false; http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java index a5a9263..4e08389 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java @@ -329,7 +329,7 @@ public class Instance { @Override public String toString() { - return instanceId + " @ " + (taskManager != null ? taskManager.path() : "ActorRef.noSender") + " - " + - numberOfSlots + " slots" + " - " + hashCode(); + return String.format("%s @ %s - %d slots - URL: %s", instanceId, connectionInfo.getHostname(), + numberOfSlots, (taskManager != null ? taskManager.path() : "ActorRef.noSender")); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java index b28da35..2ee41da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java @@ -27,8 +27,6 @@ import java.util.Map; import java.util.Set; import akka.actor.ActorRef; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.GlobalConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,9 +55,6 @@ public class InstanceManager { /** Listeners that want to be notified about availability and disappearance of instances */ private final List<InstanceListener> instanceListeners = new ArrayList<InstanceListener>(); - - /** Duration after which a task manager is considered dead if it did not send a heart-beat message. */ - private final long heartbeatTimeout; /** The total number of task slots that the system has */ private int totalNumberOfAliveTaskSlots; @@ -72,32 +67,12 @@ public class InstanceManager { // ------------------------------------------------------------------------ /** - * Creates an instance manager, using the global configuration value for maximum interval between heartbeats - * where a task manager is still considered alive. + * Creates an new instance manager. */ public InstanceManager() { - this(GlobalConfiguration.getLong( - ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT)); - } - - public InstanceManager(long heartbeatTimeout) { - this(heartbeatTimeout, heartbeatTimeout); - } - - public InstanceManager(long heartbeatTimeout, long cleanupInterval) { - if (heartbeatTimeout <= 0 || cleanupInterval <= 0) { - throw new IllegalArgumentException("Heartbeat timeout and cleanup interval must be positive."); - } - this.registeredHostsById = new HashMap<InstanceID, Instance>(); this.registeredHostsByConnection = new HashMap<ActorRef, Instance>(); this.deadHosts = new HashSet<ActorRef>(); - this.heartbeatTimeout = heartbeatTimeout; - } - - public long getHeartbeatTimeout() { - return heartbeatTimeout; } public void shutdown() { @@ -132,14 +107,19 @@ public class InstanceManager { if (host == null){ if (LOG.isDebugEnabled()) { - LOG.debug("Received hearbeat from unknown TaskManager with instance ID " + instanceId.toString() + - " Possibly TaskManager was maked as dead (timed-out) earlier. " + + LOG.debug("Received heartbeat from unknown TaskManager with instance ID " + instanceId.toString() + + " Possibly TaskManager was marked as dead (timed-out) earlier. " + "Reporting back that task manager is no longer known."); } return false; } host.reportHeartBeat(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Received heartbeat from TaskManager " + host); + } + return true; } } @@ -164,7 +144,7 @@ public class InstanceManager { " which was marked as dead earlier because of a heart-beat timeout."); } - InstanceID id = null; + InstanceID id; do { id = new InstanceID(); } while (registeredHostsById.containsKey(id)); @@ -178,8 +158,8 @@ public class InstanceManager { totalNumberOfAliveTaskSlots += numberOfSlots; if (LOG.isInfoEnabled()) { - LOG.info(String.format("Registered TaskManager at %s as %s. Current number of registered hosts is %d.", - taskManager.path(), id, registeredHostsById.size())); + LOG.info(String.format("Registered TaskManager at %s (%s) as %s. Current number of registered hosts is %d.", + connectionInfo.getHostname(), taskManager.path(), id, registeredHostsById.size())); } host.reportHeartBeat(); http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 1741cdb..cb8acae 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -723,9 +723,9 @@ object JobManager { ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES) - val delayBetweenRetries = 2 * configuration.getLong( - ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT) + val delayBetweenRetries = 2 * Duration(configuration.getString( + ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, + ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)).toMillis (archiveCount, profilingEnabled, cleanupInterval, executionRetries, delayBetweenRetries) } http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 99d824b..c14ebca 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -433,7 +433,7 @@ import scala.collection.JavaConverters._ if (jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) { profiler match { case Some(profilerActorRef) => profilerActorRef ! MonitorTask(task) - case None => log.info("There is no profiling enabled for the task manager.") + case None => // no log message here - floods the log } } http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java index dff3dd3..b9aa674 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java @@ -65,10 +65,6 @@ public class InstanceManagerTest{ try { InstanceManager cm = new InstanceManager(); - // catches error that some parts assumed config values in seconds, others in - // milliseconds by verifying that the timeout is not larger than 2 minutes. - assertTrue(cm.getHeartbeatTimeout() < 2 * 60 * 1000); - final int dataPort = 20000; HardwareDescription hardwareDescription = HardwareDescription.extractFromSystem(4096); @@ -76,7 +72,7 @@ public class InstanceManagerTest{ InetAddress address = InetAddress.getByName("127.0.0.1"); // register three instances - InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, dataPort + 0); + InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, dataPort); InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, dataPort + 15); InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, dataPort + 30); @@ -121,7 +117,7 @@ public class InstanceManagerTest{ HardwareDescription resources = HardwareDescription.extractFromSystem(4096); InetAddress address = InetAddress.getByName("127.0.0.1"); - InstanceConnectionInfo ici = new InstanceConnectionInfo(address, dataPort + 0); + InstanceConnectionInfo ici = new InstanceConnectionInfo(address, dataPort); JavaTestKit probe = new JavaTestKit(system); InstanceID i = cm.registerTaskManager(probe.getRef(), ici, resources, 1); @@ -157,7 +153,7 @@ public class InstanceManagerTest{ InetAddress address = InetAddress.getByName("127.0.0.1"); // register three instances - InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, dataPort + 0); + InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, dataPort); InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, dataPort + 1); InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, dataPort + 2); http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 4416ba6..c26b255 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -101,7 +101,6 @@ object TestingUtils { val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTMs) - config.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 1000) config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout) new TestingCluster(config) } http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java new file mode 100644 index 0000000..911edb3 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.recovery; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.*; + +public class SimpleRecoveryITCase { + + + private static ForkableFlinkMiniCluster cluster; + + @BeforeClass + public static void setupCluster() { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); + config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s"); + + cluster = new ForkableFlinkMiniCluster(config, false); + } + + @AfterClass + public static void teardownCluster() { + try { + cluster.stop(); + } + catch (Throwable t) { + System.err.println("Error stopping cluster on shutdown"); + t.printStackTrace(); + fail("Cluster shutdown caused an exception: " + t.getMessage()); + } + } + + @Test + public void testFailedRunThenSuccessfulRun() { + + FailOnceMapper.failuresBeforeSuccess = 1; + + try { + List<Long> resultCollection = new ArrayList<Long>(); + + // attempt 1 + { + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRPCPort()); + + env.setDegreeOfParallelism(4); + env.setNumberOfExecutionRetries(0); + + env.generateSequence(1, 10) + .map(new FailOnceMapper<Long>()) + .reduce(new ReduceFunction<Long>() { + @Override + public Long reduce(Long value1, Long value2) { + return value1 + value2; + } + }) + .output(new LocalCollectionOutputFormat<Long>(resultCollection)); + + try { + env.execute(); + fail("The program should have failed"); + } + catch (ProgramInvocationException e) { + // expected + } + } + + // attempt 2 + { + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRPCPort()); + + env.setDegreeOfParallelism(4); + env.setNumberOfExecutionRetries(0); + + env.generateSequence(1, 10) + .map(new FailOnceMapper<Long>()) + .reduce(new ReduceFunction<Long>() { + @Override + public Long reduce(Long value1, Long value2) { + return value1 + value2; + } + }) + .output(new LocalCollectionOutputFormat<Long>(resultCollection)); + + try { + JobExecutionResult result = env.execute(); + assertTrue(result.getNetRuntime() >= 0); + assertNotNull(result.getAllAccumulatorResults()); + assertTrue(result.getAllAccumulatorResults().isEmpty()); + } + catch (JobExecutionException e) { + fail("The program should have succeeded on the second run"); + } + + long sum = 0; + for (long l : resultCollection) { + sum += l; + } + assertEquals(55, sum); + } + + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testRestart() { + + FailOnceMapper.failuresBeforeSuccess = 1; + + try { + List<Long> resultCollection = new ArrayList<Long>(); + + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRPCPort()); + + env.setDegreeOfParallelism(4); + env.setNumberOfExecutionRetries(1); + + env.generateSequence(1, 10) + .map(new FailOnceMapper<Long>()) + .reduce(new ReduceFunction<Long>() { + @Override + public Long reduce(Long value1, Long value2) { + return value1 + value2; + } + }) + .output(new LocalCollectionOutputFormat<Long>(resultCollection)); + + try { + JobExecutionResult result = env.execute(); + assertTrue(result.getNetRuntime() >= 0); + assertNotNull(result.getAllAccumulatorResults()); + assertTrue(result.getAllAccumulatorResults().isEmpty()); + } + catch (JobExecutionException e) { + fail("The program should have succeeded on the second run"); + } + + long sum = 0; + for (long l : resultCollection) { + sum += l; + } + assertEquals(55, sum); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testRestartMultipleTimes() { + + FailOnceMapper.failuresBeforeSuccess = 3; + + try { + List<Long> resultCollection = new ArrayList<Long>(); + + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRPCPort()); + + env.setDegreeOfParallelism(4); + env.setNumberOfExecutionRetries(3); + + env.generateSequence(1, 10) + .map(new FailOnceMapper<Long>()) + .reduce(new ReduceFunction<Long>() { + @Override + public Long reduce(Long value1, Long value2) { + return value1 + value2; + } + }) + .output(new LocalCollectionOutputFormat<Long>(resultCollection)); + + try { + JobExecutionResult result = env.execute(); + assertTrue(result.getNetRuntime() >= 0); + assertNotNull(result.getAllAccumulatorResults()); + assertTrue(result.getAllAccumulatorResults().isEmpty()); + } + catch (JobExecutionException e) { + fail("The program should have succeeded on the second run"); + } + + long sum = 0; + for (long l : resultCollection) { + sum += l; + } + assertEquals(55, sum); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // ------------------------------------------------------------------------------------ + + private static class FailOnceMapper<T> extends RichMapFunction<T, T> { + + private static int failuresBeforeSuccess = 0; + + @Override + public T map(T value) throws Exception { + if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) { + failuresBeforeSuccess--; + throw new Exception("Test Failure"); + } + + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java new file mode 100644 index 0000000..85856ba --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.recovery; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.pattern.Patterns; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.messages.TaskManagerMessages; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; + +public class TaskManagerFailureRecoveryITCase { + + @Test + public void testRestartWithFailingTaskManager() { + + final int PARALLELISM = 4; + + ForkableFlinkMiniCluster cluster = null; + ActorSystem additionalSystem = null; + + try { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM); + config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); + + config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms"); + config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s"); + config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2); + + cluster = new ForkableFlinkMiniCluster(config, false); + + // for the result + List<Long> resultCollection = new ArrayList<Long>(); + + final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getJobManagerRPCPort()); + + env.setDegreeOfParallelism(PARALLELISM); + env.setNumberOfExecutionRetries(1); + + env.generateSequence(1, 10) + .map(new FailingMapper<Long>()) + .reduce(new ReduceFunction<Long>() { + @Override + public Long reduce(Long value1, Long value2) { + return value1 + value2; + } + }) + .output(new LocalCollectionOutputFormat<Long>(resultCollection)); + + + // simple reference (atomic does not matter) to pass back an exception from the trigger thread + final AtomicReference<Throwable> ref = new AtomicReference<Throwable>(); + + // trigger the execution from a separate thread, so we are available to temper with the + // cluster during the execution + Thread trigger = new Thread("program trigger") { + @Override + public void run() { + try { + env.execute(); + } + catch (Throwable t) { + ref.set(t); + } + } + }; + trigger.setDaemon(true); + trigger.start(); + + // block until all the mappers are actually deployed + // the mappers in turn are waiting + for (int i = 0; i < PARALLELISM; i++) { + FailingMapper.TASK_TO_COORD_QUEUE.take(); + } + + // bring up one more task manager and wait for it to appear + { + additionalSystem = cluster.startTaskManagerActorSystem(2); + ActorRef additionalTaskManager = cluster.startTaskManager(2, additionalSystem); + Object message = TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(); + Future<Object> future = Patterns.ask(additionalTaskManager, message, 30000); + + try { + Await.result(future, new FiniteDuration(30000, TimeUnit.MILLISECONDS)); + } + catch (TimeoutException e) { + fail ("The additional TaskManager did not come up within 30 seconds"); + } + } + + // kill the two other TaskManagers + for (ActorRef tm : cluster.getTaskManagersAsJava()) { + tm.tell(PoisonPill.getInstance(), null); + } + + // wait for the next set of mappers (the recovery ones) to come online + for (int i = 0; i < PARALLELISM; i++) { + FailingMapper.TASK_TO_COORD_QUEUE.take(); + } + + // tell the mappers that they may continue this time + for (int i = 0; i < PARALLELISM; i++) { + FailingMapper.COORD_TO_TASK_QUEUE.add(new Object()); + } + + // wait for the program to finish + trigger.join(); + if (ref.get() != null) { + Throwable t = ref.get(); + t.printStackTrace(); + fail("Program execution caused an exception: " + t.getMessage()); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (additionalSystem != null) { + additionalSystem.shutdown(); + } + if (cluster != null) { + cluster.stop(); + } + } + } + + private static class FailingMapper<T> extends RichMapFunction<T, T> { + + private static final BlockingQueue<Object> TASK_TO_COORD_QUEUE = new LinkedBlockingQueue<Object>(); + + private static final BlockingQueue<Object> COORD_TO_TASK_QUEUE = new LinkedBlockingQueue<Object>(); + + @Override + public void open(Configuration parameters) throws Exception { + TASK_TO_COORD_QUEUE.add(new Object()); + COORD_TO_TASK_QUEUE.take(); + } + + @Override + public T map(T value) throws Exception { + return value; + } + } +}