[FLINK-8843][REST] Decouple bind REST address from advertised address

By default bind REST server on wildcard address.
Rename RestServerEndpoint#getRestAddress to getRestBaseUrl.

This closes #5707.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efd7336f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efd7336f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efd7336f

Branch: refs/heads/master
Commit: efd7336fa693a9f82b9ecfb5d81c0ef747ab7801
Parents: 0caff35
Author: gyao <[email protected]>
Authored: Thu Mar 15 22:04:58 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Sun Mar 18 15:58:14 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/configuration/RestOptions.java | 15 +++++--
 .../runtime/entrypoint/ClusterEntrypoint.java   |  4 +-
 .../HighAvailabilityServicesUtils.java          |  6 ++-
 .../flink/runtime/minicluster/MiniCluster.java  |  4 +-
 .../minicluster/MiniClusterConfiguration.java   |  4 ++
 .../flink/runtime/rest/RestServerEndpoint.java  | 46 +++++++++++---------
 .../rest/RestServerEndpointConfiguration.java   | 32 ++++++++++----
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  6 +--
 .../runtime/rest/RestServerEndpointITCase.java  |  3 +-
 9 files changed, 80 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 94d7977..e7421c4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -29,12 +29,21 @@ import static 
org.apache.flink.configuration.ConfigOptions.key;
 public class RestOptions {
 
        /**
-        * The address that the server binds itself to / the client connects to.
+        * The address that the server binds itself to.
+        */
+       public static final ConfigOption<String> REST_BIND_ADDRESS =
+               key("rest.bind-address")
+                       .noDefaultValue()
+                       .withDescription("The address that the server binds 
itself.");
+
+       /**
+        * The address that should be used by clients to connect to the server.
         */
        public static final ConfigOption<String> REST_ADDRESS =
                key("rest.address")
-                       .defaultValue("localhost")
-                       .withDescription("The address that the server binds 
itself to / the client connects to.");
+                       .noDefaultValue()
+                       .withDeprecatedKeys(JobManagerOptions.ADDRESS.key())
+                       .withDescription("The address that should be used by 
clients to connect to the server.");
 
        /**
         * The port that the server listens on / the client connects to.

http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 676415b..63c8072 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -330,7 +330,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                                metricRegistry,
                                this,
                                clusterInformation,
-                               webMonitorEndpoint.getRestAddress());
+                               webMonitorEndpoint.getRestBaseUrl());
 
                        jobManagerMetricGroup = 
MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, 
rpcService.getAddress());
 
@@ -345,7 +345,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                                metricRegistry.getMetricQueryServicePath(),
                                archivedExecutionGraphStore,
                                this,
-                               webMonitorEndpoint.getRestAddress());
+                               webMonitorEndpoint.getRestBaseUrl());
 
                        LOG.debug("Starting ResourceManager.");
                        resourceManager.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 4f12f2b..f19a421 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -39,6 +39,8 @@ import org.apache.flink.util.ConfigurationException;
 
 import java.util.concurrent.Executor;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Utils class to instantiate {@link HighAvailabilityServices} implementations.
  */
@@ -97,7 +99,9 @@ public class HighAvailabilityServicesUtils {
                                        addressResolution,
                                        configuration);
 
-                               final String address = 
configuration.getString(RestOptions.REST_ADDRESS);
+                               final String address = 
checkNotNull(configuration.getString(RestOptions.REST_ADDRESS),
+                                       "%s must be set",
+                                       RestOptions.REST_ADDRESS.key());
                                final int port = 
configuration.getInteger(RestOptions.REST_PORT);
                                final boolean enableSSL = 
configuration.getBoolean(SecurityOptions.SSL_ENABLED);
                                final String protocol = enableSSL ? "https://"; 
: "http://";;

http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 74aa388..dfe30af 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -341,7 +341,7 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
 
                                dispatcherRestEndpoint.start();
 
-                               restAddressURI = new 
URI(dispatcherRestEndpoint.getRestAddress());
+                               restAddressURI = new 
URI(dispatcherRestEndpoint.getRestBaseUrl());
 
                                // bring up the dispatcher that launches 
JobManagers when jobs submitted
                                LOG.info("Starting job dispatcher(s) for 
JobManger");
@@ -361,7 +361,7 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                                        new MemoryArchivedExecutionGraphStore(),
                                        
Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
                                        new ShutDownFatalErrorHandler(),
-                                       
dispatcherRestEndpoint.getRestAddress());
+                                       
dispatcherRestEndpoint.getRestBaseUrl());
 
                                dispatcher.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 08af0c4..fe76694 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.util.Preconditions;
@@ -167,6 +168,9 @@ public class MiniClusterConfiguration {
                public MiniClusterConfiguration build() {
                        final Configuration modifiedConfiguration = new 
Configuration(configuration);
                        
modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTaskManager);
+                       modifiedConfiguration.setString(
+                               RestOptions.REST_ADDRESS,
+                               
modifiedConfiguration.getString(RestOptions.REST_ADDRESS, "localhost"));
 
                        return new MiniClusterConfiguration(
                                modifiedConfiguration,

http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index a3d4843..dfb01ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -73,8 +73,9 @@ public abstract class RestServerEndpoint {
 
        private final Object lock = new Object();
 
-       private final String configuredAddress;
-       private final int configuredPort;
+       private final String restAddress;
+       private final String restBindAddress;
+       private final int restBindPort;
        private final SSLEngine sslEngine;
        private final Path uploadDir;
        private final int maxContentLength;
@@ -84,14 +85,16 @@ public abstract class RestServerEndpoint {
 
        private ServerBootstrap bootstrap;
        private Channel serverChannel;
-       private String restAddress;
+       private String restBaseUrl;
 
        private State state = State.CREATED;
 
        public RestServerEndpoint(RestServerEndpointConfiguration 
configuration) throws IOException {
                Preconditions.checkNotNull(configuration);
-               this.configuredAddress = configuration.getEndpointBindAddress();
-               this.configuredPort = configuration.getEndpointBindPort();
+
+               this.restAddress = configuration.getRestAddress();
+               this.restBindAddress = configuration.getRestBindAddress();
+               this.restBindPort = configuration.getRestBindPort();
                this.sslEngine = configuration.getSslEngine();
 
                this.uploadDir = configuration.getUploadDir();
@@ -101,8 +104,6 @@ public abstract class RestServerEndpoint {
                this.responseHeaders = configuration.getResponseHeaders();
 
                terminationFuture = new CompletableFuture<>();
-
-               this.restAddress = null;
        }
 
        /**
@@ -176,18 +177,23 @@ public abstract class RestServerEndpoint {
                                .childHandler(initializer);
 
                        final ChannelFuture channel;
-                       if (configuredAddress == null) {
-                               channel = bootstrap.bind(configuredPort);
+                       if (restBindAddress == null) {
+                               channel = bootstrap.bind(restBindPort);
                        } else {
-                               channel = bootstrap.bind(configuredAddress, 
configuredPort);
+                               channel = bootstrap.bind(restBindAddress, 
restBindPort);
                        }
                        serverChannel = channel.syncUninterruptibly().channel();
 
-                       InetSocketAddress bindAddress = (InetSocketAddress) 
serverChannel.localAddress();
-                       String address = 
bindAddress.getAddress().getHostAddress();
-                       int port = bindAddress.getPort();
+                       final InetSocketAddress bindAddress = 
(InetSocketAddress) serverChannel.localAddress();
+                       final String advertisedAddress;
+                       if (bindAddress.getAddress().isAnyLocalAddress()) {
+                               advertisedAddress = this.restAddress;
+                       } else {
+                               advertisedAddress = 
bindAddress.getAddress().getHostAddress();
+                       }
+                       final int port = bindAddress.getPort();
 
-                       log.info("Rest endpoint listening at {}:{}", address, 
port);
+                       log.info("Rest endpoint listening at {}:{}", 
advertisedAddress, port);
 
                        final String protocol;
 
@@ -197,9 +203,9 @@ public abstract class RestServerEndpoint {
                                protocol = "http://";;
                        }
 
-                       restAddress = protocol + address + ':' + port;
+                       restBaseUrl = protocol + advertisedAddress + ':' + port;
 
-                       restAddressFuture.complete(restAddress);
+                       restAddressFuture.complete(restBaseUrl);
 
                        state = State.RUNNING;
 
@@ -238,14 +244,14 @@ public abstract class RestServerEndpoint {
        }
 
        /**
-        * Returns the address of the REST server endpoint.
+        * Returns the base URL of the REST server endpoint.
         *
-        * @return REST address of this endpoint
+        * @return REST base URL of this endpoint
         */
-       public String getRestAddress() {
+       public String getRestBaseUrl() {
                synchronized (lock) {
                        Preconditions.checkState(state != State.CREATED, "The 
RestServerEndpoint has not been started yet.");
-                       return restAddress;
+                       return restBaseUrl;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
index 35bd6ea..1fac08e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -45,6 +45,8 @@ import static java.util.Objects.requireNonNull;
  */
 public final class RestServerEndpointConfiguration {
 
+       private final String restAddress;
+
        @Nullable
        private final String restBindAddress;
 
@@ -60,6 +62,7 @@ public final class RestServerEndpointConfiguration {
        private final Map<String, String> responseHeaders;
 
        private RestServerEndpointConfiguration(
+                       final String restAddress,
                        @Nullable String restBindAddress,
                        int restBindPort,
                        @Nullable SSLEngine sslEngine,
@@ -69,12 +72,20 @@ public final class RestServerEndpointConfiguration {
                Preconditions.checkArgument(0 <= restBindPort && restBindPort < 
65536, "The bing rest port " + restBindPort + " is out of range (0, 65536[");
                Preconditions.checkArgument(maxContentLength > 0, 
"maxContentLength must be positive, was: %d", maxContentLength);
 
+               this.restAddress = requireNonNull(restAddress);
                this.restBindAddress = restBindAddress;
                this.restBindPort = restBindPort;
                this.sslEngine = sslEngine;
                this.uploadDir = requireNonNull(uploadDir);
                this.maxContentLength = maxContentLength;
-               this.responseHeaders = 
requireNonNull(Collections.unmodifiableMap(responseHeaders));
+               this.responseHeaders = 
Collections.unmodifiableMap(requireNonNull(responseHeaders));
+       }
+
+       /**
+        * @see RestOptions#REST_ADDRESS
+        */
+       public String getRestAddress() {
+               return restAddress;
        }
 
        /**
@@ -82,7 +93,7 @@ public final class RestServerEndpointConfiguration {
         *
         * @return address that the REST server endpoint should bind itself to
         */
-       public String getEndpointBindAddress() {
+       public String getRestBindAddress() {
                return restBindAddress;
        }
 
@@ -91,7 +102,7 @@ public final class RestServerEndpointConfiguration {
         *
         * @return port that the REST server endpoint should listen on
         */
-       public int getEndpointBindPort() {
+       public int getRestBindPort() {
                return restBindPort;
        }
 
@@ -136,12 +147,16 @@ public final class RestServerEndpointConfiguration {
         */
        public static RestServerEndpointConfiguration 
fromConfiguration(Configuration config) throws ConfigurationException {
                Preconditions.checkNotNull(config);
-               String address = config.getString(RestOptions.REST_ADDRESS);
 
-               int port = config.getInteger(RestOptions.REST_PORT);
+               final String restAddress = 
Preconditions.checkNotNull(config.getString(RestOptions.REST_ADDRESS),
+                       "%s must be set",
+                       RestOptions.REST_ADDRESS.key());
+
+               final String restBindAddress = 
config.getString(RestOptions.REST_BIND_ADDRESS);
+               final int port = config.getInteger(RestOptions.REST_PORT);
 
                SSLEngine sslEngine = null;
-               boolean enableSSL = 
config.getBoolean(SecurityOptions.SSL_ENABLED);
+               final boolean enableSSL = 
config.getBoolean(SecurityOptions.SSL_ENABLED);
                if (enableSSL) {
                        try {
                                SSLContext sslContext = 
SSLUtils.createSSLServerContext(config);
@@ -159,14 +174,15 @@ public final class RestServerEndpointConfiguration {
                        config.getString(WebOptions.UPLOAD_DIR, 
config.getString(WebOptions.TMP_DIR)),
                        "flink-web-upload-" + UUID.randomUUID());
 
-               int maxContentLength = 
config.getInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH);
+               final int maxContentLength = 
config.getInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH);
 
                final Map<String, String> responseHeaders = 
Collections.singletonMap(
                        HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN,
                        
config.getString(WebOptions.ACCESS_CONTROL_ALLOW_ORIGIN));
 
                return new RestServerEndpointConfiguration(
-                       address,
+                       restAddress,
+                       restBindAddress,
                        port,
                        sslEngine,
                        uploadDir,

http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index dfb2fc8..50ad7eb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -666,18 +666,18 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
 
        @Override
        public void grantLeadership(final UUID leaderSessionID) {
-               log.info("{} was granted leadership with leaderSessionID={}", 
getRestAddress(), leaderSessionID);
+               log.info("{} was granted leadership with leaderSessionID={}", 
getRestBaseUrl(), leaderSessionID);
                leaderElectionService.confirmLeaderSessionID(leaderSessionID);
        }
 
        @Override
        public void revokeLeadership() {
-               log.info("{} lost leadership", getRestAddress());
+               log.info("{} lost leadership", getRestBaseUrl());
        }
 
        @Override
        public String getAddress() {
-               return getRestAddress();
+               return getRestBaseUrl();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 32f3ec8..784c141 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -115,6 +115,7 @@ public class RestServerEndpointITCase extends TestLogger {
        public void setup() throws Exception {
                Configuration config = new Configuration();
                config.setInteger(RestOptions.REST_PORT, 0);
+               config.setString(RestOptions.REST_ADDRESS, "localhost");
                config.setString(WebOptions.UPLOAD_DIR, 
temporaryFolder.newFolder().getCanonicalPath());
                config.setInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH, 
TEST_REST_MAX_CONTENT_LENGTH);
                config.setInteger(RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH, 
TEST_REST_MAX_CONTENT_LENGTH);
@@ -335,7 +336,7 @@ public class RestServerEndpointITCase extends TestLogger {
 
        private HttpURLConnection openHttpConnectionForUpload(final String 
boundary) throws IOException {
                final HttpURLConnection connection =
-                       (HttpURLConnection) new 
URL(serverEndpoint.getRestAddress() + "/upload").openConnection();
+                       (HttpURLConnection) new 
URL(serverEndpoint.getRestBaseUrl() + "/upload").openConnection();
                connection.setDoOutput(true);
                connection.setRequestProperty("Content-Type", 
"multipart/form-data; boundary=" + boundary);
                return connection;

Reply via email to