This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6ace721e5515a8fcc085e501bcfc8586551d1d36
Author: Till Rohrmann <[email protected]>
AuthorDate: Mon Sep 24 08:56:58 2018 +0200

    [FLINK-10403] Port JobManagerHAProcessFailureBatchRecoveryITCase to new 
code base
    
    This closes #6751.
---
 .../flink/runtime/testutils/DispatcherProcess.java | 179 +++++++++++++++++++++
 ...ManagerHAProcessFailureBatchRecoveryITCase.java | 144 +++++++++--------
 2 files changed, 253 insertions(+), 70 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
new file mode 100644
index 0000000..79b0dc3
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
+import org.apache.flink.runtime.jobmanager.JobManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link Dispatcher} instance running in a separate JVM.
+ */
+public class DispatcherProcess extends TestJvmProcess {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(JobManagerProcess.class);
+
+       /** Pattern to parse the job manager port from the logs. */
+       private static final Pattern PORT_PATTERN = Pattern.compile(".*Actor 
system started at akka\\.tcp://flink@.*:(\\d+).*");
+
+       /** ID for this JobManager. */
+       private final int id;
+
+       /** The configuration for the JobManager. */
+       private final Configuration config;
+
+       /** Configuration parsed as args for {@link 
JobManagerProcess.JobManagerProcessEntryPoint}. */
+       private final String[] jvmArgs;
+
+       /** The port the JobManager listens on. */
+       private int jobManagerPort;
+
+       /**
+        * Creates a {@link JobManager} running in a separate JVM.
+        *
+        * @param id     ID for the JobManager
+        * @param config Configuration for the job manager process
+        *
+        * @throws Exception
+        */
+       public DispatcherProcess(int id, Configuration config) throws Exception 
{
+               checkArgument(id >= 0, "Negative ID");
+               this.id = id;
+               this.config = checkNotNull(config, "Configuration");
+
+               ArrayList<String> args = new ArrayList<>();
+
+               for (Map.Entry<String, String> entry : 
config.toMap().entrySet()) {
+                       args.add("--" + entry.getKey());
+                       args.add(entry.getValue());
+               }
+
+               this.jvmArgs = new String[args.size()];
+               args.toArray(jvmArgs);
+       }
+
+       @Override
+       public String getName() {
+               return "JobManager " + id;
+       }
+
+       @Override
+       public String[] getJvmArgs() {
+               return jvmArgs;
+       }
+
+       @Override
+       public String getEntryPointClassName() {
+               return DispatcherProcessEntryPoint.class.getName();
+       }
+
+       public Configuration getConfig() {
+               return config;
+       }
+
+       /**
+        * Parses the port from the job manager logs and returns it.
+        *
+        * <p>If a call to this method succeeds, successive calls will directly
+        * return the port and re-parse the logs.
+        *
+        * @param timeout Timeout for log parsing.
+        * @return The port of the job manager
+        * @throws InterruptedException  If interrupted while waiting before
+        *                               retrying to parse the logs
+        * @throws NumberFormatException If the parsed port is not a number
+        */
+       public int getJobManagerPort(FiniteDuration timeout) throws 
InterruptedException, NumberFormatException {
+               if (jobManagerPort > 0) {
+                       return jobManagerPort;
+               } else {
+                       Deadline deadline = timeout.fromNow();
+                       while (deadline.hasTimeLeft()) {
+                               Matcher matcher = 
PORT_PATTERN.matcher(getProcessOutput());
+                               if (matcher.find()) {
+                                       String port = matcher.group(1);
+                                       jobManagerPort = Integer.parseInt(port);
+                                       return jobManagerPort;
+                               } else {
+                                       Thread.sleep(100);
+                               }
+                       }
+
+                       throw new RuntimeException("Could not parse port from 
logs");
+               }
+       }
+
+       @Override
+       public String toString() {
+               return String.format("JobManagerProcess(id=%d, port=%d)", id, 
jobManagerPort);
+       }
+
+       /**
+        * Entry point for the JobManager process.
+        */
+       public static class DispatcherProcessEntryPoint {
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(DispatcherProcessEntryPoint.class);
+
+               /**
+                * Entrypoint of the DispatcherProcessEntryPoint.
+                *
+                * <p>Other arguments are parsed to a {@link Configuration} and 
passed to the
+                * JobManager, for instance: <code>--high-availability 
ZOOKEEPER --high-availability.zookeeper.quorum
+                * "xyz:123:456"</code>.
+                */
+               public static void main(String[] args) {
+                       try {
+                               ParameterTool params = 
ParameterTool.fromArgs(args);
+                               Configuration config = 
params.getConfiguration();
+                               LOG.info("Configuration: {}.", config);
+
+                               config.setInteger(JobManagerOptions.PORT, 0);
+                               config.setInteger(RestOptions.PORT, 0);
+
+                               final StandaloneSessionClusterEntrypoint 
clusterEntrypoint = new StandaloneSessionClusterEntrypoint(config);
+
+                               
ClusterEntrypoint.runClusterEntrypoint(clusterEntrypoint);
+                       }
+                       catch (Throwable t) {
+                               LOG.error("Failed to start JobManager process", 
t);
+                               System.exit(1);
+                       }
+               }
+       }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index d3accff..9e9ce07 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -22,36 +22,38 @@ import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
-import org.apache.flink.runtime.testutils.JobManagerProcess;
+import org.apache.flink.runtime.testutils.DispatcherProcess;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import org.apache.commons.io.FileUtils;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -59,12 +61,15 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -90,23 +95,28 @@ import static org.junit.Assert.fail;
 @RunWith(Parameterized.class)
 public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 
-       private static final ZooKeeperTestEnvironment ZooKeeper = new 
ZooKeeperTestEnvironment(1);
+       private static ZooKeeperTestEnvironment zooKeeper;
 
        private static final FiniteDuration TestTimeOut = new FiniteDuration(5, 
TimeUnit.MINUTES);
 
        @Rule
        public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-       @AfterClass
-       public static void tearDown() throws Exception {
-               if (ZooKeeper != null) {
-                       ZooKeeper.shutdown();
-               }
+       @BeforeClass
+       public static void setup() {
+               zooKeeper = new ZooKeeperTestEnvironment(1);
        }
 
        @Before
        public void cleanUp() throws Exception {
-               ZooKeeper.deleteAll();
+               zooKeeper.deleteAll();
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               if (zooKeeper != null) {
+                       zooKeeper.shutdown();
+               }
        }
 
        protected static final String READY_MARKER_FILE_PREFIX = "ready_";
@@ -141,7 +151,6 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase 
extends TestLogger {
         */
        private void testJobManagerFailure(String zkQuorum, final File 
coordinateDir, final File zookeeperStoragePath) throws Exception {
                Configuration config = new Configuration();
-               config.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE);
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
                config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkQuorum);
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
zookeeperStoragePath.getAbsolutePath());
@@ -149,7 +158,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase 
extends TestLogger {
                ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
                                "leader", 1, config);
                env.setParallelism(PARALLELISM);
-               env.setNumberOfExecutionRetries(1);
+               env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
0L));
                env.getConfig().setExecutionMode(executionMode);
                env.getConfig().disableSysoutLogging();
 
@@ -212,7 +221,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase 
extends TestLogger {
        }
 
        @Test
-       public void testJobManagerProcessFailure() throws Exception {
+       public void testDispatcherProcessFailure() throws Exception {
+               final Time timeout = Time.seconds(30L);
                final File zookeeperStoragePath = temporaryFolder.newFolder();
 
                // Config
@@ -222,15 +232,11 @@ public class 
JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 
                assertEquals(PARALLELISM, numberOfTaskManagers * 
numberOfSlotsPerTaskManager);
 
-               // Setup
-               // Test actor system
-               ActorSystem testActorSystem;
-
                // Job managers
-               final JobManagerProcess[] jmProcess = new 
JobManagerProcess[numberOfJobManagers];
+               final DispatcherProcess[] dispatcherProcesses = new 
DispatcherProcess[numberOfJobManagers];
 
                // Task managers
-               final ActorSystem[] tmActorSystem = new 
ActorSystem[numberOfTaskManagers];
+               TaskManagerRunner[] taskManagerRunners = new 
TaskManagerRunner[numberOfTaskManagers];
 
                HighAvailabilityServices highAvailabilityServices = null;
 
@@ -239,24 +245,25 @@ public class 
JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
                // Coordination between the processes goes through a directory
                File coordinateTempDir = null;
 
+               // Cluster config
+               Configuration config = 
ZooKeeperTestUtils.createZooKeeperHAConfig(
+                       zooKeeper.getConnectString(), 
zookeeperStoragePath.getPath());
+               // Task manager configuration
+               config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+               config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+               config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+
+               final RpcService rpcService = 
AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
+
                try {
                        final Deadline deadline = TestTimeOut.fromNow();
 
                        // Coordination directory
                        coordinateTempDir = temporaryFolder.newFolder();
 
-                       // Job Managers
-                       Configuration config = 
ZooKeeperTestUtils.createZooKeeperHAConfig(
-                                       ZooKeeper.getConnectString(), 
zookeeperStoragePath.getPath());
-
                        // Start first process
-                       jmProcess[0] = new JobManagerProcess(0, config);
-                       jmProcess[0].startProcess();
-
-                       // Task manager configuration
-                       
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
-                       
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-                       config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+                       dispatcherProcesses[0] = new DispatcherProcess(0, 
config);
+                       dispatcherProcesses[0].startProcess();
 
                        highAvailabilityServices = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
                                config,
@@ -264,27 +271,13 @@ public class 
JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 
                        // Start the task manager process
                        for (int i = 0; i < numberOfTaskManagers; i++) {
-                               tmActorSystem[i] = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-                               TaskManager.startTaskManagerComponentsAndActor(
-                                       config,
-                                       ResourceID.generate(),
-                                       tmActorSystem[i],
-                                       highAvailabilityServices,
-                                       NoOpMetricRegistry.INSTANCE,
-                                       "localhost",
-                                       Option.<String>empty(),
-                                       false,
-                                       TaskManager.class);
+                               taskManagerRunners[i] = new 
TaskManagerRunner(config, ResourceID.generate());
+                               taskManagerRunners[i].start();
                        }
 
-                       // Test actor system
-                       testActorSystem = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-
-                       jmProcess[0].getActorRef(testActorSystem, 
deadline.timeLeft());
-
                        // Leader listener
                        TestingListener leaderListener = new TestingListener();
-                       leaderRetrievalService = 
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
+                       leaderRetrievalService = 
highAvailabilityServices.getDispatcherLeaderRetriever();
                        leaderRetrievalService.start(leaderListener);
 
                        // Initial submission
@@ -293,13 +286,14 @@ public class 
JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
                        String leaderAddress = leaderListener.getAddress();
                        UUID leaderId = leaderListener.getLeaderSessionID();
 
-                       // Get the leader ref
-                       ActorRef leaderRef = 
AkkaUtils.getActorRef(leaderAddress, testActorSystem, deadline.timeLeft());
-                       ActorGateway leaderGateway = new 
AkkaActorGateway(leaderRef, leaderId);
+                       final CompletableFuture<DispatcherGateway> 
dispatcherGatewayFuture = rpcService.connect(
+                               leaderAddress,
+                               DispatcherId.fromUuid(leaderId),
+                               DispatcherGateway.class);
+                       final DispatcherGateway dispatcherGateway = 
dispatcherGatewayFuture.get();
 
                        // Wait for all task managers to connect to the leading 
job manager
-                       
JobManagerActorTestUtils.waitForTaskManagers(numberOfTaskManagers, 
leaderGateway,
-                                       deadline.timeLeft());
+                       waitForTaskManagers(numberOfTaskManagers, 
dispatcherGateway, deadline.timeLeft());
 
                        final File coordinateDirClosure = coordinateTempDir;
                        final Throwable[] errorRef = new Throwable[1];
@@ -309,7 +303,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase 
extends TestLogger {
                                @Override
                                public void run() {
                                        try {
-                                               
testJobManagerFailure(ZooKeeper.getConnectString(), coordinateDirClosure, 
zookeeperStoragePath);
+                                               
testJobManagerFailure(zooKeeper.getConnectString(), coordinateDirClosure, 
zookeeperStoragePath);
                                        }
                                        catch (Throwable t) {
                                                t.printStackTrace();
@@ -326,12 +320,10 @@ public class 
JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
                                        READY_MARKER_FILE_PREFIX, PARALLELISM, 
deadline.timeLeft().toMillis());
 
                        // Kill one of the job managers and trigger recovery
-                       jmProcess[0].destroy();
+                       dispatcherProcesses[0].destroy();
 
-                       jmProcess[1] = new JobManagerProcess(1, config);
-                       jmProcess[1].startProcess();
-
-                       jmProcess[1].getActorRef(testActorSystem, 
deadline.timeLeft());
+                       dispatcherProcesses[1] = new DispatcherProcess(1, 
config);
+                       dispatcherProcesses[1].startProcess();
 
                        // we create the marker file which signals the program 
functions tasks that they can complete
                        
AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new 
File(coordinateTempDir, PROCEED_MARKER_FILE));
@@ -358,7 +350,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase 
extends TestLogger {
                        // for Travis and the root problem is not shown)
                        t.printStackTrace();
 
-                       for (JobManagerProcess p : jmProcess) {
+                       for (DispatcherProcess p : dispatcherProcesses) {
                                if (p != null) {
                                        p.printProcessLog();
                                }
@@ -368,8 +360,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase 
extends TestLogger {
                }
                finally {
                        for (int i = 0; i < numberOfTaskManagers; i++) {
-                               if (tmActorSystem[i] != null) {
-                                       tmActorSystem[i].shutdown();
+                               if (taskManagerRunners[i] != null) {
+                                       taskManagerRunners[i].close();
                                }
                        }
 
@@ -377,7 +369,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase 
extends TestLogger {
                                leaderRetrievalService.stop();
                        }
 
-                       for (JobManagerProcess jmProces : jmProcess) {
+                       for (DispatcherProcess jmProces : dispatcherProcesses) {
                                if (jmProces != null) {
                                        jmProces.destroy();
                                }
@@ -387,6 +379,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase 
extends TestLogger {
                                
highAvailabilityServices.closeAndCleanupAllData();
                        }
 
+                       RpcUtils.terminateRpcService(rpcService, timeout);
+
                        // Delete coordination directory
                        if (coordinateTempDir != null) {
                                try {
@@ -398,4 +392,14 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase 
extends TestLogger {
                }
        }
 
+       private void waitForTaskManagers(int numberOfTaskManagers, 
DispatcherGateway dispatcherGateway, FiniteDuration timeLeft) throws 
ExecutionException, InterruptedException {
+               FutureUtils.retrySuccesfulWithDelay(
+                       () -> 
dispatcherGateway.requestClusterOverview(Time.milliseconds(timeLeft.toMillis())),
+                       Time.milliseconds(50L),
+                       
org.apache.flink.api.common.time.Deadline.fromNow(Duration.ofMillis(timeLeft.toMillis())),
+                       clusterOverview -> 
clusterOverview.getNumTaskManagersConnected() >= numberOfTaskManagers,
+                       new 
ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor()))
+                       .get();
+       }
+
 }

Reply via email to