This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d5aa97f  [FLINK-10511][Cluster Management] Reuse the port selection 
and RPC service creation in JM and TM
d5aa97f is described below

commit d5aa97f8a0e653125d97846d67f3fdf6a10dbedf
Author: Shimin Yang <[email protected]>
AuthorDate: Mon Oct 15 17:31:13 2018 +0800

    [FLINK-10511][Cluster Management] Reuse the port selection and RPC service 
creation in JM and TM
    
    This commit adds an overload method to create RPC service for specific
    port range in AkkaRpcServiceUtils and get rid of the port selection
    logic in TaskManagerRunner. Meanwhile, the AkkaRpcServiceUtils pass
    the work of akka config generation to BootstrapTools.
    
    This closes #6845.
---
 .../runtime/entrypoint/ClusterEntrypoint.java      | 20 +++---
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java      | 71 +++++++++++-----------
 .../runtime/taskexecutor/TaskManagerRunner.java    | 38 +-----------
 3 files changed, 42 insertions(+), 87 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index f51bf69..35fad32 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
@@ -50,7 +49,7 @@ import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -65,6 +64,7 @@ import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
@@ -83,8 +83,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import scala.concurrent.duration.FiniteDuration;
-
 /**
  * Base class for the Flink cluster entry points.
  *
@@ -277,6 +275,11 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
                }
        }
 
+       @Nonnull
+       private RpcService createRpcService(Configuration configuration, String 
bindAddress, String portRange) throws Exception {
+               return AkkaRpcServiceUtils.createRpcService(bindAddress, 
portRange, configuration);
+       }
+
        /**
         * Returns the port range for the common {@link RpcService}.
         *
@@ -291,15 +294,6 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
                }
        }
 
-       protected RpcService createRpcService(
-                       Configuration configuration,
-                       String bindAddress,
-                       String portRange) throws Exception {
-               ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG);
-               FiniteDuration duration = AkkaUtils.getTimeout(configuration);
-               return new AkkaRpcService(actorSystem, 
Time.of(duration.length(), duration.unit()));
-       }
-
        protected HighAvailabilityServices createHaServices(
                Configuration configuration,
                Executor executor) throws Exception {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 3ee7641..43a52bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
 import org.apache.flink.runtime.net.SSLUtils;
@@ -30,11 +31,11 @@ import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.Preconditions;
 
 import akka.actor.ActorSystem;
-import com.typesafe.config.Config;
-import org.jboss.netty.channel.ChannelException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -64,6 +65,24 @@ public class AkkaRpcServiceUtils {
         * Utility method to create RPC service from configuration and 
hostname, port.
         *
         * @param hostname   The hostname/address that describes the 
TaskManager's data location.
+        * @param portRangeDefinition   The port range to start TaskManager on.
+        * @param configuration                 The configuration for the 
TaskManager.
+        * @return   The rpc service which is used to start and connect to the 
TaskManager RpcEndpoint .
+        * @throws IOException      Thrown, if the actor system can not bind to 
the address
+        * @throws Exception      Thrown is some other error occurs while 
creating akka actor system
+        */
+       public static RpcService createRpcService(
+                       String hostname,
+                       String portRangeDefinition,
+                       Configuration configuration) throws Exception {
+               final ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, hostname, portRangeDefinition, 
LOG);
+               return instantiateAkkaRpcService(configuration, actorSystem);
+       }
+
+       /**
+        * Utility method to create RPC service from configuration and 
hostname, port.
+        *
+        * @param hostname   The hostname/address that describes the 
TaskManager's data location.
         * @param port           If true, the TaskManager will not initiate the 
TCP network stack.
         * @param configuration                 The configuration for the 
TaskManager.
         * @return   The rpc service which is used to start and connect to the 
TaskManager RpcEndpoint .
@@ -71,39 +90,15 @@ public class AkkaRpcServiceUtils {
         * @throws Exception      Thrown is some other error occurs while 
creating akka actor system
         */
        public static RpcService createRpcService(
-               String hostname,
-               int port,
-               Configuration configuration) throws Exception {
-               LOG.info("Starting AkkaRpcService at {}.", 
NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port));
-
-               final ActorSystem actorSystem;
-
-               try {
-                       Config akkaConfig;
-
-                       if (hostname != null && !hostname.isEmpty()) {
-                               // remote akka config
-                               akkaConfig = 
AkkaUtils.getAkkaConfig(configuration, hostname, port);
-                       } else {
-                               // local akka config
-                               akkaConfig = 
AkkaUtils.getAkkaConfig(configuration);
-                       }
-
-                       LOG.debug("Using akka configuration \n {}.", 
akkaConfig);
-
-                       actorSystem = AkkaUtils.createActorSystem(akkaConfig);
-               } catch (Throwable t) {
-                       if (t instanceof ChannelException) {
-                               Throwable cause = t.getCause();
-                               if (cause != null && t.getCause() instanceof 
java.net.BindException) {
-                                       String address = 
NetUtils.hostAndPortToUrlString(hostname, port);
-                                       throw new IOException("Unable to bind 
AkkaRpcService actor system to address " +
-                                               address + " - " + 
cause.getMessage(), t);
-                               }
-                       }
-                       throw new Exception("Could not create TaskManager actor 
system", t);
-               }
+                       String hostname,
+                       int port,
+                       Configuration configuration) throws Exception {
+               final ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, hostname, port, LOG);
+               return instantiateAkkaRpcService(configuration, actorSystem);
+       }
 
+       @Nonnull
+       private static RpcService instantiateAkkaRpcService(Configuration 
configuration, ActorSystem actorSystem) {
                final Time timeout = AkkaUtils.getTimeoutAsTime(configuration);
                return new AkkaRpcService(actorSystem, timeout);
        }
@@ -144,14 +139,13 @@ public class AkkaRpcServiceUtils {
        }
 
        /**
-        * 
         * @param hostname The hostname or address where the target RPC service 
is listening.
         * @param port The port where the target RPC service is listening.
         * @param endpointName The name of the RPC endpoint.
         * @param addressResolution Whether to try address resolution of the 
given hostname or not.
         *                          This allows to fail fast in case that the 
hostname cannot be resolved.
         * @param akkaProtocol True, if security/encryption is enabled, false 
otherwise.
-        * 
+        *
         * @return The RPC URL of the specified RPC endpoint.
         */
        public static String getRpcUrl(
@@ -178,6 +172,9 @@ public class AkkaRpcServiceUtils {
                return String.format("%s://flink@%s/user/%s", protocolPrefix, 
hostPort, endpointName);
        }
 
+       /**
+        * Whether to use TCP or encrypted TCP for Akka.
+        */
        public enum AkkaProtocol {
                TCP,
                SSL_TCP
@@ -204,6 +201,6 @@ public class AkkaRpcServiceUtils {
 
        // 
------------------------------------------------------------------------
 
-       /** This class is not meant to be instantiated */
+       /** This class is not meant to be instantiated. */
        private AkkaRpcServiceUtils() {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 15fa415..1f3818a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -56,7 +56,6 @@ import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
-import org.apache.flink.util.NetUtils;
 
 import akka.actor.ActorSystem;
 import org.slf4j.Logger;
@@ -64,11 +63,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
-import java.net.BindException;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -426,39 +423,6 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                }
 
                final String portRangeDefinition = 
configuration.getString(TaskManagerOptions.RPC_PORT);
-
-               return bindWithPort(configuration, taskManagerHostname, 
portRangeDefinition);
-       }
-
-       private static RpcService bindWithPort(
-               Configuration configuration,
-               String taskManagerHostname,
-               String portRangeDefinition) throws Exception{
-
-               // parse port range definition and create port iterator
-               Iterator<Integer> portsIterator;
-               try {
-                       portsIterator = 
NetUtils.getPortRangeFromString(portRangeDefinition);
-               } catch (Exception e) {
-                       throw new IllegalArgumentException("Invalid port range 
definition: " + portRangeDefinition);
-               }
-
-               while (portsIterator.hasNext()) {
-                       try {
-                               return 
AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portsIterator.next(), 
configuration);
-                       }
-                       catch (Exception e) {
-                               // we can continue to try if this contains a 
netty channel exception
-                               Throwable cause = e.getCause();
-                               if (!(cause instanceof 
org.jboss.netty.channel.ChannelException ||
-                                       cause instanceof 
java.net.BindException)) {
-                                       throw e;
-                               } // else fall through the loop and try the 
next port
-                       }
-               }
-
-               // if we come here, we have exhausted the port range
-               throw new BindException("Could not start task manager on any 
port in port range "
-                       + portRangeDefinition);
+               return 
AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portRangeDefinition, 
configuration);
        }
 }

Reply via email to