This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 93b042da45a0bbf418e7df96059fe36eb4f18b9e Author: Till Rohrmann <[email protected]> AuthorDate: Sun Sep 23 22:05:28 2018 +0200 [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new code base This closes #6750. --- .../runtime/entrypoint/ClusterEntrypoint.java | 4 +- ...tractTaskManagerProcessFailureRecoveryTest.java | 134 ++++----------------- ...skManagerProcessFailureBatchRecoveryITCase.java | 4 +- ...nagerProcessFailureStreamingRecoveryITCase.java | 2 - 4 files changed, 28 insertions(+), 116 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index c9a1722..5665500 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -147,7 +147,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { return terminationFuture; } - protected void startCluster() throws ClusterEntrypointException { + public void startCluster() throws ClusterEntrypointException { LOG.info("Starting {}.", getClass().getSimpleName()); try { @@ -392,7 +392,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { return resultConfiguration; } - private CompletableFuture<ApplicationStatus> shutDownAsync( + public CompletableFuture<ApplicationStatus> shutDownAsync( ApplicationStatus applicationStatus, @Nullable String diagnostics, boolean cleanupHaData) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index 5cd6f30..0962ddf 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -19,29 +19,21 @@ package org.apache.flink.test.recovery; import org.apache.flink.configuration.AkkaOptions; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HeartbeatManagerOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; -import org.apache.flink.runtime.jobmanager.JobManager; -import org.apache.flink.runtime.jobmanager.MemoryArchivist; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint; import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; import org.apache.flink.runtime.taskmanager.TaskManager; -import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.util.BlobServerResource; import org.apache.flink.util.NetUtils; import org.apache.flink.util.TestLogger; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.pattern.Patterns; -import akka.util.Timeout; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -52,17 +44,8 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.StringWriter; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import scala.Option; -import scala.Some; -import scala.Tuple2; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath; import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath; import static org.junit.Assert.assertFalse; @@ -92,6 +75,9 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule + public final BlobServerResource blobServerResource = new BlobServerResource(); + @Test public void testTaskManagerProcessFailure() throws Exception { @@ -99,14 +85,25 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test final StringWriter processOutput2 = new StringWriter(); final StringWriter processOutput3 = new StringWriter(); - ActorSystem jmActorSystem = null; - HighAvailabilityServices highAvailabilityServices = null; Process taskManagerProcess1 = null; Process taskManagerProcess2 = null; Process taskManagerProcess3 = null; File coordinateTempDir = null; + final int jobManagerPort = NetUtils.getAvailablePort(); + final int restPort = NetUtils.getAvailablePort(); + + Configuration jmConfig = new Configuration(); + jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); + jmConfig.setString(JobManagerOptions.ADDRESS, "localhost"); + jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort); + jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 500L); + jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L); + jmConfig.setInteger(RestOptions.PORT, restPort); + + final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(jmConfig); + try { // check that we run this test only if the java command // is available on this machine @@ -124,37 +121,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test // coordination between the processes goes through a directory coordinateTempDir = temporaryFolder.newFolder(); - // find a free port to start the JobManager - final int jobManagerPort = NetUtils.getAvailablePort(); - - // start a JobManager - Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort); - - Configuration jmConfig = new Configuration(); - jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms"); - jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s"); - jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 9); - jmConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "10 s"); - jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); - jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1()); - jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort); - - highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( - jmConfig, - TestingUtils.defaultExecutor(), - HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); - - jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<>(localAddress)); - ActorRef jmActor = JobManager.startJobManagerActors( - jmConfig, - jmActorSystem, - TestingUtils.defaultExecutor(), - TestingUtils.defaultExecutor(), - highAvailabilityServices, - NoOpMetricRegistry.INSTANCE, - Option.empty(), - JobManager.class, - MemoryArchivist.class)._1(); + clusterEntrypoint.startCluster(); // the TaskManager java command String[] command = new String[] { @@ -163,7 +130,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), "-Xms80m", "-Xmx80m", "-classpath", getCurrentClasspath(), - TaskManagerProcessEntryPoint.class.getName(), + TaskExecutorProcessEntryPoint.class.getName(), String.valueOf(jobManagerPort) }; @@ -173,10 +140,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test taskManagerProcess2 = new ProcessBuilder(command).start(); new CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2); - // we wait for the JobManager to have the two TaskManagers available - // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes) - waitUntilNumTaskManagersAreRegistered(jmActor, 2, 120000); - // the program will set a marker file in each of its parallel tasks once they are ready, so that // this coordinating code is aware of this. // the program will very slowly consume elements until the marker file (later created by the @@ -189,7 +152,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test @Override public void run() { try { - testTaskManagerFailure(jobManagerPort, coordinateDirClosure); + testTaskManagerFailure(restPort, coordinateDirClosure); } catch (Throwable t) { t.printStackTrace(); @@ -220,10 +183,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test taskManagerProcess3 = new ProcessBuilder(command).start(); new CommonTestUtils.PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3); - // we wait for the third TaskManager to register - // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes) - waitUntilNumTaskManagersAreRegistered(jmActor, 3, 120000); - // kill one of the previous TaskManagers, triggering a failure and recovery taskManagerProcess1.destroy(); taskManagerProcess1 = null; @@ -270,13 +229,8 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test if (taskManagerProcess3 != null) { taskManagerProcess3.destroy(); } - if (jmActorSystem != null) { - jmActorSystem.shutdown(); - } - if (highAvailabilityServices != null) { - highAvailabilityServices.closeAndCleanupAllData(); - } + clusterEntrypoint.shutDownAsync(ApplicationStatus.SUCCEEDED, null, true).get(); } } @@ -290,44 +244,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test */ public abstract void testTaskManagerFailure(int jobManagerPort, File coordinateDir) throws Exception; - protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelayMillis) - throws Exception { - final long pollInterval = 10_000_000; // 10 ms = 10,000,000 nanos - final long deadline = System.nanoTime() + maxDelayMillis * 1_000_000; - - long time; - - while ((time = System.nanoTime()) < deadline) { - FiniteDuration timeout = new FiniteDuration(pollInterval, TimeUnit.NANOSECONDS); - - try { - Future<?> result = Patterns.ask(jobManager, - JobManagerMessages.getRequestNumberRegisteredTaskManager(), - new Timeout(timeout)); - - int numTMs = (Integer) Await.result(result, timeout); - - if (numTMs == numExpected) { - return; - } - } - catch (TimeoutException e) { - // ignore and retry - } - catch (ClassCastException e) { - fail("Wrong response: " + e.getMessage()); - } - - long timePassed = System.nanoTime() - time; - long remainingMillis = (pollInterval - timePassed) / 1_000_000; - if (remainingMillis > 0) { - Thread.sleep(remainingMillis); - } - } - - fail("The TaskManagers did not register within the expected time (" + maxDelayMillis + "msecs)"); - } - protected static void printProcessLog(String processName, String log) { if (log == null || log.length() == 0) { return; diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java index 7dc6f0c..4815c49 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java @@ -25,7 +25,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -68,10 +67,9 @@ public class TaskManagerProcessFailureBatchRecoveryITCase extends AbstractTaskMa public void testTaskManagerFailure(int jobManagerPort, final File coordinateDir) throws Exception { final Configuration configuration = new Configuration(); - configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE); ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, configuration); env.setParallelism(PARALLELISM); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10000)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L)); env.getConfig().setExecutionMode(executionMode); env.getConfig().disableSysoutLogging(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java index 766a799..fbf6b5b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; @@ -67,7 +66,6 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa final File tempCheckpointDir = tempFolder.newFolder(); final Configuration configuration = new Configuration(); - configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE); StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( "localhost", jobManagerPort,
