[
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15190949#comment-15190949
]
ASF GitHub Bot commented on FLINK-3544:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/1741#discussion_r55831570
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import com.typesafe.config.Config;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.slf4j.Logger;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.BindException;
+import java.net.ServerSocket;
+import java.util.Iterator;
+
+/**
+ * Tools for starting JobManager and TaskManager processes, including the
+ * Actor Systems used to run the JobManager and TaskManager actors.
+ */
+public class BootstrapTools {
+
+ /**
+ * Starts an ActorSystem with the given configuration listening at the
address/ports.
+ * @param configuration The Flink configuration
+ * @param listeningAddress The address to listen at.
+ * @param portRangeDefinition The port range to choose a port from.
+ * @param logger The logger to output log information.
+ * @return The ActorSystem which has been started
+ * @throws Exception
+ */
+ public static ActorSystem startActorSystem(Configuration configuration,
+
String listeningAddress,
+
String portRangeDefinition,
+
Logger logger) throws Exception {
+
+ // parse port range definition and create port iterator
+ Iterator<Integer> portsIterator;
+ try {
+ portsIterator =
NetUtils.getPortRangeFromString(portRangeDefinition);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid port range
definition: " + portRangeDefinition);
+ }
+
+ while (portsIterator.hasNext()) {
+ // first, we check if the port is available by opening
a socket
+ // if the actor system fails to start on the port, we
try further
+ ServerSocket availableSocket =
NetUtils.createSocketFromPorts(
+ portsIterator,
+ new NetUtils.SocketFactory() {
+ @Override
+ public ServerSocket createSocket(int
port) throws IOException {
+ return new ServerSocket(port);
+ }
+ });
+
+ int port;
+ if (availableSocket == null) {
+ throw new BindException("Unable to allocate
further port in port range: " + portRangeDefinition);
+ } else {
+ port = availableSocket.getLocalPort();
+ try {
+ availableSocket.close();
+ } catch (IOException ignored) {}
+ }
+
+ try {
+ return startActorSystem(configuration,
listeningAddress, port, logger);
+ }
+ catch (Exception e) {
+ // we can continue to try if this contains a
netty channel exception
+ Throwable cause = e.getCause();
+ if (!(cause instanceof
org.jboss.netty.channel.ChannelException ||
+ cause instanceof
java.net.BindException)) {
+ throw e;
+ } // else fall through the loop and try the
next port
+ }
+ }
+
+ // if we come here, we have exhausted the port range
+ throw new BindException("Could not start actor system on any
port in port range "
+ + portRangeDefinition);
+ }
+
+ /**
+ * Starts an Actor System at a specific port.
+ * @param configuration The Flink configuration.
+ * @param listeningAddress The address to listen at.
+ * @param listeningPort The port to listen at.
+ * @param logger the logger to output log information.
+ * @return The ActorSystem which has been started.
+ * @throws Exception
+ */
+ public static ActorSystem startActorSystem(
+ Configuration configuration,
+ String listeningAddress,
+ int listeningPort,
+ Logger logger) throws Exception
{
+
+ String hostPortUrl =
NetUtils.hostAndPortToUrlString(listeningAddress, listeningPort);
+ logger.info("Trying to start actor system at " + hostPortUrl);
+
+ try {
+ Config akkaConfig = AkkaUtils.getAkkaConfig(
+ configuration,
+ new scala.Some<>(new scala.Tuple2<String,
Object>(listeningAddress, listeningPort))
+ );
+ if (logger.isDebugEnabled()) {
+ logger.debug("Using akka configuration\n " +
akkaConfig);
+ }
+
+ ActorSystem actorSystem =
AkkaUtils.createActorSystem(akkaConfig);
+ logger.info("Actor system started at " + hostPortUrl);
--- End diff --
That's how I prefer it as well. Must have been copied over when the helper
functions were created.
> ResourceManager runtime components
> ----------------------------------
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
> Issue Type: Sub-task
> Components: ResourceManager
> Affects Versions: 1.1.0
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)