[
https://issues.apache.org/jira/browse/FLINK-10511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654708#comment-16654708
]
ASF GitHub Bot commented on FLINK-10511:
----------------------------------------
asfgit closed pull request #6845: [FLINK-10511][Cluster Management] Reuse the
port selection and RPC se…
URL: https://github.com/apache/flink/pull/6845
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index f528bc4bcc4..cc12ff5ee5c 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -33,7 +33,6 @@
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
@@ -50,7 +49,7 @@
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.runtime.security.SecurityUtils;
@@ -83,10 +82,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import scala.concurrent.duration.FiniteDuration;
-
-import static
org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FORK_JOIN_EXECUTOR;
-
/**
* Base class for the Flink cluster entry points.
*
@@ -252,7 +247,7 @@ protected void initializeServices(Configuration
configuration) throws Exception
final String bindAddress =
configuration.getString(JobManagerOptions.ADDRESS);
final String portRange = getRPCPortRange(configuration);
- commonRpcService = createRpcService(configuration,
bindAddress, portRange);
+ commonRpcService =
AkkaRpcServiceUtils.createRpcService(bindAddress, portRange, configuration);
// update the configuration used to create the high
availability services
configuration.setString(JobManagerOptions.ADDRESS,
commonRpcService.getAddress());
@@ -293,15 +288,6 @@ protected String getRPCPortRange(Configuration
configuration) {
}
}
- protected RpcService createRpcService(
- Configuration configuration,
- String bindAddress,
- String portRange) throws Exception {
- ActorSystem actorSystem =
BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG,
FORK_JOIN_EXECUTOR);
- FiniteDuration duration = AkkaUtils.getTimeout(configuration);
- return new AkkaRpcService(actorSystem,
Time.of(duration.length(), duration.unit()));
- }
-
protected HighAvailabilityServices createHaServices(
Configuration configuration,
Executor executor) throws Exception {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 3ee7641f717..28f04f7677d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -22,6 +22,7 @@
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
import org.apache.flink.runtime.net.SSLUtils;
@@ -30,8 +31,6 @@
import org.apache.flink.util.Preconditions;
import akka.actor.ActorSystem;
-import com.typesafe.config.Config;
-import org.jboss.netty.channel.ChannelException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +59,25 @@
// RPC instantiation
//
------------------------------------------------------------------------
+ /**
+ * Utility method to create RPC service from configuration and
hostname, port.
+ *
+ * @param hostname The hostname/address that describes the
TaskManager's data location.
+ * @param portRangeDefinition The port range to start TaskManager on.
+ * @param configuration The configuration for the
TaskManager.
+ * @return The rpc service which is used to start and connect to the
TaskManager RpcEndpoint .
+ * @throws IOException Thrown, if the actor system can not bind to
the address
+ * @throws Exception Thrown is some other error occurs while
creating akka actor system
+ */
+ public static RpcService createRpcService(
+ String hostname,
+ String portRangeDefinition,
+ Configuration configuration) throws Exception {
+ final ActorSystem actorSystem =
BootstrapTools.startActorSystem(configuration, hostname, portRangeDefinition,
LOG);
+ final Time timeout = AkkaUtils.getTimeoutAsTime(configuration);
+ return new AkkaRpcService(actorSystem, timeout);
+ }
+
/**
* Utility method to create RPC service from configuration and
hostname, port.
*
@@ -75,35 +93,7 @@ public static RpcService createRpcService(
int port,
Configuration configuration) throws Exception {
LOG.info("Starting AkkaRpcService at {}.",
NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port));
-
- final ActorSystem actorSystem;
-
- try {
- Config akkaConfig;
-
- if (hostname != null && !hostname.isEmpty()) {
- // remote akka config
- akkaConfig =
AkkaUtils.getAkkaConfig(configuration, hostname, port);
- } else {
- // local akka config
- akkaConfig =
AkkaUtils.getAkkaConfig(configuration);
- }
-
- LOG.debug("Using akka configuration \n {}.",
akkaConfig);
-
- actorSystem = AkkaUtils.createActorSystem(akkaConfig);
- } catch (Throwable t) {
- if (t instanceof ChannelException) {
- Throwable cause = t.getCause();
- if (cause != null && t.getCause() instanceof
java.net.BindException) {
- String address =
NetUtils.hostAndPortToUrlString(hostname, port);
- throw new IOException("Unable to bind
AkkaRpcService actor system to address " +
- address + " - " +
cause.getMessage(), t);
- }
- }
- throw new Exception("Could not create TaskManager actor
system", t);
- }
-
+ final ActorSystem actorSystem =
BootstrapTools.startActorSystem(configuration, hostname, port, LOG);
final Time timeout = AkkaUtils.getTimeoutAsTime(configuration);
return new AkkaRpcService(actorSystem, timeout);
}
@@ -144,7 +134,6 @@ public static String getRpcUrl(
}
/**
- *
* @param hostname The hostname or address where the target RPC service
is listening.
* @param port The port where the target RPC service is listening.
* @param endpointName The name of the RPC endpoint.
@@ -204,6 +193,6 @@ public static String createRandomName(String prefix) {
//
------------------------------------------------------------------------
- /** This class is not meant to be instantiated */
+ /** This class is not meant to be instantiated. */
private AkkaRpcServiceUtils() {}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 15fa4150b1f..2f54172bf6a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -56,7 +56,6 @@
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
-import org.apache.flink.util.NetUtils;
import akka.actor.ActorSystem;
import org.slf4j.Logger;
@@ -68,7 +67,6 @@
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -426,39 +424,14 @@ public static RpcService createRpcService(
}
final String portRangeDefinition =
configuration.getString(TaskManagerOptions.RPC_PORT);
-
- return bindWithPort(configuration, taskManagerHostname,
portRangeDefinition);
- }
-
- private static RpcService bindWithPort(
- Configuration configuration,
- String taskManagerHostname,
- String portRangeDefinition) throws Exception{
-
- // parse port range definition and create port iterator
- Iterator<Integer> portsIterator;
try {
- portsIterator =
NetUtils.getPortRangeFromString(portRangeDefinition);
+ return
AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portRangeDefinition,
configuration);
} catch (Exception e) {
- throw new IllegalArgumentException("Invalid port range
definition: " + portRangeDefinition);
- }
-
- while (portsIterator.hasNext()) {
- try {
- return
AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portsIterator.next(),
configuration);
- }
- catch (Exception e) {
- // we can continue to try if this contains a
netty channel exception
- Throwable cause = e.getCause();
- if (!(cause instanceof
org.jboss.netty.channel.ChannelException ||
- cause instanceof
java.net.BindException)) {
- throw e;
- } // else fall through the loop and try the
next port
+ if (e instanceof BindException) {
+ throw new BindException("Could not start task
manager on any port in port range "
+ + portRangeDefinition);
}
+ throw e;
}
-
- // if we come here, we have exhausted the port range
- throw new BindException("Could not start task manager on any
port in port range "
- + portRangeDefinition);
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Code duplication of creating RPC service in ClusterEntrypoint and
> AkkaRpcServiceUtils
> -------------------------------------------------------------------------------------
>
> Key: FLINK-10511
> URL: https://issues.apache.org/jira/browse/FLINK-10511
> Project: Flink
> Issue Type: Improvement
> Components: Cluster Management
> Reporter: Shimin Yang
> Assignee: Shimin Yang
> Priority: Minor
> Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The TaskManagerRunner is using AkkaRpcServiceUtils to create RPC service, but
> the ClusterEntrypoint use a protected method to do the same job. I think it's
> better to use the same method in AkkaRpcServiceUtils for reuse of code.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)