Repository: flink
Updated Branches:
  refs/heads/master a725910aa -> 718f6e4e3


[FLINK-4876] [webfrontend] Allow to bind to a specific interface

- Adds config key 'jobmanager.web.address' to configure listening address
- Default is Netty's default, picking anyLocalAddress()

This closes #2680.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/718f6e4e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/718f6e4e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/718f6e4e

Branch: refs/heads/master
Commit: 718f6e4e372affd60522cb26eb02c95b7637e65d
Parents: a725910
Author: Bram Vogelaar <[email protected]>
Authored: Fri Oct 21 15:04:25 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Mon Nov 7 15:21:07 2016 +0100

----------------------------------------------------------------------
 docs/setup/config.md                                |  2 ++
 .../apache/flink/configuration/ConfigConstants.java | 11 +++++++++--
 flink-dist/src/main/resources/flink-conf.yaml       |  4 ++++
 .../flink/runtime/webmonitor/WebMonitorConfig.java  |  3 +++
 .../flink/runtime/webmonitor/WebRuntimeMonitor.java | 16 ++++++++++++----
 5 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/718f6e4e/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 5791945..cc1f6bc 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -131,6 +131,8 @@ For Kafka and ZK, process-wide JAAS config will be created 
using the provided se
 
 - `taskmanager.log.path`: The config parameter defining the taskmanager log 
file location
 
+- `jobmanager.web.address`: Address of the JobManager's web interface 
(DEFAULT: anyLocalAddress()).
+
 - `jobmanager.web.port`: Port of the JobManager's web interface (DEFAULT: 
8081).
 
 - `jobmanager.web.tmpdir`: This configuration parameter allows defining the 
Flink web directory to be used by the web interface. The web interface

http://git-wip-us.apache.org/repos/asf/flink/blob/718f6e4e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b377e54..fb5a760 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -21,6 +21,8 @@ package org.apache.flink.configuration;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
+
 /**
  * This class contains all constants for the configuration. That includes the 
configuration keys and
  * the default values.
@@ -546,7 +548,7 @@ public final class ConfigConstants {
        
        
        // ------------------------- JobManager Web Frontend 
----------------------
-       
+
        /**
         * The port for the runtime monitor web-frontend server.
         */
@@ -1194,7 +1196,12 @@ public final class ConfigConstants {
        
        
        // ------------------------- JobManager Web Frontend 
----------------------
-       
+
+       /** The config key for the address of the JobManager web frontend. */
+       public static final ConfigOption<String> 
DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS =
+               key("jobmanager.web.address")
+                       .noDefaultValue();
+
        /** The config key for the port of the JobManager web frontend.
         * Setting this value to {@code -1} disables the web frontend. */
        public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;

http://git-wip-us.apache.org/repos/asf/flink/blob/718f6e4e/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml 
b/flink-dist/src/main/resources/flink-conf.yaml
index 58efe12..751acda 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -60,6 +60,10 @@ parallelism.default: 1
 #==============================================================================
 # Web Frontend
 #==============================================================================
+ 
+# The address under which the web-based runtime monitor listens.
+#
+#jobmanager.web.address: 0.0.0.0
 
 # The port under which the web-based runtime monitor listens.
 # A value of -1 deactivates the web server.

http://git-wip-us.apache.org/repos/asf/flink/blob/718f6e4e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
index dde6d0a..18fc5e8 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
@@ -61,6 +61,9 @@ public class WebMonitorConfig {
                this.config = config;
        }
 
+       public String getWebFrontendAddress() {
+               return 
config.getValue(ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS);
+       }
 
        public int getWebFrontendPort() {
                return config.getInteger(JOB_MANAGER_WEB_PORT_KEY, 
DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);

http://git-wip-us.apache.org/repos/asf/flink/blob/718f6e4e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 99e0894..479b4ac 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor;
 import akka.actor.ActorSystem;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
@@ -159,7 +160,9 @@ public class WebRuntimeMonitor implements WebMonitor {
                this.retriever = new JobManagerRetriever(this, actorSystem, 
AkkaUtils.getTimeout(config), timeout);
                
                final WebMonitorConfig cfg = new WebMonitorConfig(config);
-               
+
+               final String configuredAddress = cfg.getWebFrontendAddress();
+
                final int configuredPort = cfg.getWebFrontendPort();
                if (configuredPort < 0) {
                        throw new IllegalArgumentException("Web frontend port 
is invalid: " + configuredPort);
@@ -400,10 +403,15 @@ public class WebRuntimeMonitor implements WebMonitor {
                                .channel(NioServerSocketChannel.class)
                                .childHandler(initializer);
 
-               Channel ch = 
this.bootstrap.bind(configuredPort).sync().channel();
-               this.serverChannel = ch;
+               ChannelFuture ch;
+               if (configuredAddress == null) {
+                       ch = this.bootstrap.bind(configuredPort);
+               } else {
+                       ch = this.bootstrap.bind(configuredAddress, 
configuredPort);
+               }
+               this.serverChannel = ch.sync().channel();
 
-               InetSocketAddress bindAddress = (InetSocketAddress) 
ch.localAddress();
+               InetSocketAddress bindAddress = (InetSocketAddress) 
serverChannel.localAddress();
                String address = bindAddress.getAddress().getHostAddress();
                int port = bindAddress.getPort();
 

Reply via email to