Repository: flink
Updated Branches:
  refs/heads/master d6126e7ca -> c685251ce


[FLINK-7408] [conf] Create WebOptions for WebRuntimeMonitor

This commit moves the WebRuntimeMonitor related configuration options from
JobManagerOptions to WebOptions. Moreover, it removes the prefix jobmanager.

This closes #4512.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c685251c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c685251c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c685251c

Branch: refs/heads/master
Commit: c685251ce3b39362e45b07a12aa0cc6e7f665021
Parents: d6126e7
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Aug 10 11:42:09 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Aug 11 10:02:42 2017 +0200

----------------------------------------------------------------------
 docs/ops/config.md                              |  22 +--
 .../client/program/StandaloneClusterClient.java |   4 +-
 .../ConfigOptionsDocGenerator.java              |   2 +-
 .../flink/configuration/JobManagerOptions.java  | 111 -------------
 .../apache/flink/configuration/WebOptions.java  | 156 +++++++++++++++++++
 .../runtime/webmonitor/WebMonitorConfig.java    |  12 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  20 +--
 .../runtime/webmonitor/WebFrontendITCase.java   |   4 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |  16 +-
 .../clusterframework/BootstrapTools.java        |   3 +-
 .../executiongraph/ExecutionGraphBuilder.java   |   4 +-
 .../runtime/webmonitor/WebMonitorUtils.java     |   6 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  10 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |   4 +-
 .../JobManagerProcessReapingTest.java           |   5 +-
 .../runtime/testutils/ZooKeeperTestUtils.java   |   4 +-
 .../apache/flink/test/util/TestBaseUtils.java   |   6 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   7 +-
 .../yarn/entrypoint/YarnEntrypointUtils.java    |   5 +-
 19 files changed, 225 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index e84bd46..c8d5c92 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -367,27 +367,27 @@ These parameters allow for advanced tuning. The default 
values are sufficient wh
 
 - `taskmanager.net.transport`: The Netty transport type, either "nio" or 
"epoll" (DEFAULT: **nio**).
 
-### JobManager Web Frontend
+### Web Frontend
 
-- `jobmanager.web.port`: Port of the JobManager's web interface that displays 
status of running jobs and execution time breakdowns of finished jobs (DEFAULT: 
8081). Setting this value to `-1` disables the web frontend.
+- `web.port`: Port of the web interface that displays status of running jobs 
and execution time breakdowns of finished jobs (DEFAULT: 8081). Setting this 
value to `-1` disables the web frontend.
 
-- `jobmanager.web.history`: The number of latest jobs that the JobManager's 
web front-end in its history (DEFAULT: 5).
+- `web.history`: The number of latest jobs that the web front-end in its 
history (DEFAULT: 5).
 
-- `jobmanager.web.checkpoints.disable`: Disables checkpoint statistics 
(DEFAULT: `false`).
+- `web.checkpoints.disable`: Disables checkpoint statistics (DEFAULT: `false`).
 
-- `jobmanager.web.checkpoints.history`: Number of checkpoint statistics to 
remember (DEFAULT: `10`).
+- `web.checkpoints.history`: Number of checkpoint statistics to remember 
(DEFAULT: `10`).
 
-- `jobmanager.web.backpressure.cleanup-interval`: Time after which cached 
stats are cleaned up if not accessed (DEFAULT: `600000`, 10 mins).
+- `web.backpressure.cleanup-interval`: Time after which cached stats are 
cleaned up if not accessed (DEFAULT: `600000`, 10 mins).
 
-- `jobmanager.web.backpressure.refresh-interval`: Time after which available 
stats are deprecated and need to be refreshed (DEFAULT: `60000`, 1 min).
+- `web.backpressure.refresh-interval`: Time after which available stats are 
deprecated and need to be refreshed (DEFAULT: `60000`, 1 min).
 
-- `jobmanager.web.backpressure.num-samples`: Number of stack trace samples to 
take to determine back pressure (DEFAULT: `100`).
+- `web.backpressure.num-samples`: Number of stack trace samples to take to 
determine back pressure (DEFAULT: `100`).
 
-- `jobmanager.web.backpressure.delay-between-samples`: Delay between stack 
trace samples to determine back pressure (DEFAULT: `50`, 50 ms).
+- `web.backpressure.delay-between-samples`: Delay between stack trace samples 
to determine back pressure (DEFAULT: `50`, 50 ms).
 
-- `jobmanager.web.ssl.enabled`: Enable https access to the web frontend. This 
is applicable only when the global ssl flag security.ssl.enabled is set to true 
(DEFAULT: `true`).
+- `web.ssl.enabled`: Enable https access to the web frontend. This is 
applicable only when the global ssl flag security.ssl.enabled is set to true 
(DEFAULT: `true`).
 
-- `jobmanager.web.access-control-allow-origin`: Enable custom access control 
parameter for allow origin header, default is `*`.
+- `web.access-control-allow-origin`: Enable custom access control parameter 
for allow origin header, default is `*`.
 
 ### File Systems
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 19a365e..10e1bdd 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -20,7 +20,7 @@ package org.apache.flink.client.program;
 
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -54,7 +54,7 @@ public class StandaloneClusterClient extends ClusterClient {
        @Override
        public String getWebInterfaceURL() {
                String host = getJobManagerAddress().getHostString();
-               int port = 
getFlinkConfiguration().getInteger(JobManagerOptions.WEB_PORT);
+               int port = getFlinkConfiguration().getInteger(WebOptions.PORT);
                return "http://"; +  host + ":" + port;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java
index b112843..1769c35 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java
@@ -156,7 +156,7 @@ public class ConfigOptionsDocGenerator {
                // This is a temporary hack that should be removed once 
FLINK-6490 is resolved.
                // These options use System.getProperty("java.io.tmpdir") as 
the default.
                // As a result the generated table contains an actual path as 
the default, which is simply wrong.
-               if (option == JobManagerOptions.WEB_TMP_DIR || 
option.key().equals("python.dc.tmp.dir")) {
+               if (option == WebOptions.TMP_DIR || 
option.key().equals("python.dc.tmp.dir")) {
                        defaultValue = null;
                }
                return "<tr>" +

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 76b6bed..ef3306e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -89,117 +89,6 @@ public class JobManagerOptions {
                key("jobmanager.resourcemanager.reconnect-interval")
                .defaultValue(2000L);
 
-       // 
------------------------------------------------------------------------
-       //  JobManager web UI
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Config parameter defining the runtime monitor web-frontend server 
address.
-        */
-       public static final ConfigOption<String> WEB_FRONTEND_ADDRESS =
-               key("jobmanager.web.address")
-                       .noDefaultValue();
-
-       /**
-        * The port for the runtime monitor web-frontend server.
-        */
-       public static final ConfigOption<Integer> WEB_PORT =
-               key("jobmanager.web.port")
-               .defaultValue(8081);
-
-       /**
-        * The config parameter defining the Access-Control-Allow-Origin header 
for all
-        * responses from the web-frontend.
-        */
-       public static final ConfigOption<String> 
WEB_ACCESS_CONTROL_ALLOW_ORIGIN =
-               key("jobmanager.web.access-control-allow-origin")
-                       .defaultValue("*");
-
-       /**
-        * The config parameter defining the refresh interval for the 
web-frontend.
-        */
-       public static final ConfigOption<Long> WEB_REFRESH_INTERVAL =
-               key("jobmanager.web.refresh-interval")
-                       .defaultValue(3000L);
-       
-       /**
-        * Config parameter to override SSL support for the JobManager Web UI
-        */
-       public static final ConfigOption<Boolean> WEB_SSL_ENABLED =
-               key("jobmanager.web.ssl.enabled")
-               .defaultValue(true);
-
-       /**
-        * The config parameter defining the flink web directory to be used by 
the webmonitor.
-        */
-       public static final ConfigOption<String> WEB_TMP_DIR =
-               key("jobmanager.web.tmpdir")
-               .defaultValue(System.getProperty("java.io.tmpdir"));
-
-       /**
-        * The config parameter defining the directory for uploading the job 
jars. If not specified a dynamic directory
-        * will be used under the directory specified by 
JOB_MANAGER_WEB_TMPDIR_KEY.
-        */
-       public static final ConfigOption<String> WEB_UPLOAD_DIR =
-               key("jobmanager.web.upload.dir")
-               .noDefaultValue();
-
-       /**
-        * The config parameter defining the number of archived jobs for the 
jobmanager.
-        */
-       public static final ConfigOption<Integer> WEB_ARCHIVE_COUNT =
-               key("jobmanager.web.history")
-               .defaultValue(5);
-
-       /**
-        * The log file location (may be in /log for standalone but under log 
directory when using YARN).
-        */
-       public static final ConfigOption<String> WEB_LOG_PATH =
-               key("jobmanager.web.log.path")
-               .noDefaultValue();
-
-       /**
-        * Config parameter indicating whether jobs can be uploaded and run 
from the web-frontend.
-        */
-       public static final ConfigOption<Boolean> WEB_SUBMIT_ENABLE =
-               key("jobmanager.web.submit.enable")
-               .defaultValue(true);
-
-       /**
-        * Config parameter defining the number of checkpoints to remember for 
recent history.
-        */
-       public static final ConfigOption<Integer> WEB_CHECKPOINTS_HISTORY_SIZE =
-               key("jobmanager.web.checkpoints.history")
-               .defaultValue(10);
-
-       /**
-        * Time after which cached stats are cleaned up if not accessed.
-        */
-       public static final ConfigOption<Integer> 
WEB_BACKPRESSURE_CLEANUP_INTERVAL =
-               key("jobmanager.web.backpressure.cleanup-interval")
-               .defaultValue(10 * 60 * 1000);
-
-       /**
-        * Time after which available stats are deprecated and need to be 
refreshed (by resampling).
-        */
-       public static final ConfigOption<Integer> 
WEB_BACKPRESSURE_REFRESH_INTERVAL =
-               key("jobmanager.web.backpressure.refresh-interval")
-               .defaultValue(60 * 1000);
-
-       /**
-        * Number of stack trace samples to take to determine back pressure.
-        */
-       public static final ConfigOption<Integer> WEB_BACKPRESSURE_NUM_SAMPLES =
-               key("jobmanager.web.backpressure.num-samples")
-               .defaultValue(100);
-
-       /**
-        * Delay between stack trace samples to determine back pressure.
-        */
-       public static final ConfigOption<Integer> WEB_BACKPRESSURE_DELAY =
-               key("jobmanager.web.backpressure.delay-between-samples")
-               .defaultValue(50);
-
        /**
         * The location where the JobManager stores the archives of completed 
jobs.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
new file mode 100644
index 0000000..f499045
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for the WebRuntimeMonitor.
+ */
+@PublicEvolving
+public class WebOptions {
+       /**
+        * Config parameter defining the runtime monitor web-frontend server 
address.
+        */
+       public static final ConfigOption<String> ADDRESS =
+               key("web.address")
+                       .noDefaultValue()
+                       .withDeprecatedKeys("jobmanager.web.address");
+
+       /**
+        * The port for the runtime monitor web-frontend server.
+        */
+       public static final ConfigOption<Integer> PORT =
+               key("web.port")
+                       .defaultValue(8081)
+                       .withDeprecatedKeys("jobmanager.web.port");
+
+       /**
+        * The config parameter defining the Access-Control-Allow-Origin header 
for all
+        * responses from the web-frontend.
+        */
+       public static final ConfigOption<String> ACCESS_CONTROL_ALLOW_ORIGIN =
+               key("web.access-control-allow-origin")
+                       .defaultValue("*")
+                       
.withDeprecatedKeys("jobmanager.web.access-control-allow-origin");
+
+       /**
+        * The config parameter defining the refresh interval for the 
web-frontend.
+        */
+       public static final ConfigOption<Long> REFRESH_INTERVAL =
+               key("web.refresh-interval")
+                       .defaultValue(3000L)
+                       .withDeprecatedKeys("jobmanager.web.refresh-interval");
+
+       /**
+        * Config parameter to override SSL support for the JobManager Web UI
+        */
+       public static final ConfigOption<Boolean> SSL_ENABLED =
+               key("web.ssl.enabled")
+                       .defaultValue(true)
+                       .withDeprecatedKeys("jobmanager.web.ssl.enabled");
+
+       /**
+        * The config parameter defining the flink web directory to be used by 
the webmonitor.
+        */
+       public static final ConfigOption<String> TMP_DIR =
+               key("web.tmpdir")
+                       .defaultValue(System.getProperty("java.io.tmpdir"))
+                       .withDeprecatedKeys("jobmanager.web.tmpdir");
+
+       /**
+        * The config parameter defining the directory for uploading the job 
jars. If not specified a dynamic directory
+        * will be used under the directory specified by 
JOB_MANAGER_WEB_TMPDIR_KEY.
+        */
+       public static final ConfigOption<String> UPLOAD_DIR =
+               key("web.upload.dir")
+                       .noDefaultValue()
+                       .withDeprecatedKeys("jobmanager.web.upload.dir");
+
+       /**
+        * The config parameter defining the number of archived jobs for the 
jobmanager.
+        */
+       public static final ConfigOption<Integer> ARCHIVE_COUNT =
+               key("web.history")
+                       .defaultValue(5)
+                       .withDeprecatedKeys("jobmanager.web.history");
+
+       /**
+        * The log file location (may be in /log for standalone but under log 
directory when using YARN).
+        */
+       public static final ConfigOption<String> LOG_PATH =
+               key("web.log.path")
+                       .noDefaultValue()
+                       .withDeprecatedKeys("jobmanager.web.log.path");
+
+       /**
+        * Config parameter indicating whether jobs can be uploaded and run 
from the web-frontend.
+        */
+       public static final ConfigOption<Boolean> SUBMIT_ENABLE =
+               key("web.submit.enable")
+                       .defaultValue(true)
+                       .withDeprecatedKeys("jobmanager.web.submit.enable");
+
+       /**
+        * Config parameter defining the number of checkpoints to remember for 
recent history.
+        */
+       public static final ConfigOption<Integer> CHECKPOINTS_HISTORY_SIZE =
+               key("web.checkpoints.history")
+                       .defaultValue(10)
+                       
.withDeprecatedKeys("jobmanager.web.checkpoints.history");
+
+       /**
+        * Time after which cached stats are cleaned up if not accessed.
+        */
+       public static final ConfigOption<Integer> BACKPRESSURE_CLEANUP_INTERVAL 
=
+               key("web.backpressure.cleanup-interval")
+                       .defaultValue(10 * 60 * 1000)
+                       
.withDeprecatedKeys("jobmanager.web.backpressure.cleanup-interval");
+
+       /**
+        * Time after which available stats are deprecated and need to be 
refreshed (by resampling).
+        */
+       public static final ConfigOption<Integer> BACKPRESSURE_REFRESH_INTERVAL 
=
+               key("web.backpressure.refresh-interval")
+                       .defaultValue(60 * 1000)
+                       
.withDeprecatedKeys("jobmanager.web.backpressure.refresh-interval");
+
+       /**
+        * Number of stack trace samples to take to determine back pressure.
+        */
+       public static final ConfigOption<Integer> BACKPRESSURE_NUM_SAMPLES =
+               key("web.backpressure.num-samples")
+                       .defaultValue(100)
+                       
.withDeprecatedKeys("jobmanager.web.backpressure.num-samples");
+
+       /**
+        * Delay between stack trace samples to determine back pressure.
+        */
+       public static final ConfigOption<Integer> BACKPRESSURE_DELAY =
+               key("web.backpressure.delay-between-samples")
+                       .defaultValue(50)
+                       
.withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples");
+
+
+       private WebOptions() {
+               throw new IllegalAccessError();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
index 84ca049..878bfb0 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
 
 /**
  * Configuration object for {@link WebMonitor}.
@@ -37,22 +37,22 @@ public class WebMonitorConfig {
        }
 
        public String getWebFrontendAddress() {
-               return config.getValue(JobManagerOptions.WEB_FRONTEND_ADDRESS);
+               return config.getValue(WebOptions.ADDRESS);
        }
 
        public int getWebFrontendPort() {
-               return config.getInteger(JobManagerOptions.WEB_PORT);
+               return config.getInteger(WebOptions.PORT);
        }
 
        public long getRefreshInterval() {
-               return config.getLong(JobManagerOptions.WEB_REFRESH_INTERVAL);
+               return config.getLong(WebOptions.REFRESH_INTERVAL);
        }
 
        public boolean isProgramSubmitEnabled() {
-               return config.getBoolean(JobManagerOptions.WEB_SUBMIT_ENABLE);
+               return config.getBoolean(WebOptions.SUBMIT_ENABLE);
        }
 
        public String getAllowOrigin() {
-               return 
config.getString(JobManagerOptions.WEB_ACCESS_CONTROL_ALLOW_ORIGIN);
+               return config.getString(WebOptions.ACCESS_CONTROL_ALLOW_ORIGIN);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index cffd23b..e27a15f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
@@ -194,13 +194,13 @@ public class WebRuntimeMonitor implements WebMonitor {
                stackTraceSamples = new 
StackTraceSampleCoordinator(actorSystem.dispatcher(), 60000);
 
                // Back pressure stats tracker config
-               int cleanUpInterval = 
config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_CLEANUP_INTERVAL);
+               int cleanUpInterval = 
config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL);
 
-               int refreshInterval = 
config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_REFRESH_INTERVAL);
+               int refreshInterval = 
config.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL);
 
-               int numSamples = 
config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_NUM_SAMPLES);
+               int numSamples = 
config.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES);
 
-               int delay = 
config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_DELAY);
+               int delay = config.getInteger(WebOptions.BACKPRESSURE_DELAY);
 
                Time delayBetweenSamples = Time.milliseconds(delay);
 
@@ -214,7 +214,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                ExecutionContextExecutor context = 
ExecutionContext$.MODULE$.fromExecutor(executorService);
 
                // Config to enable https access to the web-ui
-               boolean enableSSL = 
config.getBoolean(JobManagerOptions.WEB_SSL_ENABLED) &&     
SSLUtils.getSSLEnabled(config);
+               boolean enableSSL = config.getBoolean(WebOptions.SSL_ENABLED) 
&& SSLUtils.getSSLEnabled(config);
 
                if (enableSSL) {
                        LOG.info("Enabling ssl for the web frontend");
@@ -318,7 +318,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                // DELETE is the preferred way of stopping a job (Rest-conform)
                delete(router, new JobStoppingHandler());
 
-               int maxCachedEntries = 
config.getInteger(JobManagerOptions.WEB_CHECKPOINTS_HISTORY_SIZE);
+               int maxCachedEntries = 
config.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
                CheckpointStatsCache cache = new 
CheckpointStatsCache(maxCachedEntries);
 
                // Register the checkpoint stats handlers
@@ -531,14 +531,14 @@ public class WebRuntimeMonitor implements WebMonitor {
        }
 
        private String getBaseDirStr(Configuration configuration) {
-               return configuration.getString(JobManagerOptions.WEB_TMP_DIR);
+               return configuration.getString(WebOptions.TMP_DIR);
        }
 
        private File getUploadDir(Configuration configuration) {
-               File baseDir = new 
File(configuration.getString(JobManagerOptions.WEB_UPLOAD_DIR,
+               File baseDir = new 
File(configuration.getString(WebOptions.UPLOAD_DIR,
                        getBaseDirStr(configuration)));
 
-               boolean uploadDirSpecified = 
configuration.contains(JobManagerOptions.WEB_UPLOAD_DIR);
+               boolean uploadDirSpecified = 
configuration.contains(WebOptions.UPLOAD_DIR);
                return uploadDirSpecified ? baseDir : new File(baseDir, 
"flink-web-" + UUID.randomUUID());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index acf0e3b..7cd2932 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -87,7 +87,7 @@ public class WebFrontendITCase extends TestLogger {
                Files.createFile(logFile.toPath());
                Files.createFile(outFile.toPath());
 
-               config.setString(JobManagerOptions.WEB_LOG_PATH, 
logFile.getAbsolutePath());
+               config.setString(WebOptions.LOG_PATH, 
logFile.getAbsolutePath());
                config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
logFile.getAbsolutePath());
 
                cluster = new LocalFlinkMiniCluster(config, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 8c4d8c3..fe16445 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -144,8 +144,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                        Path logFile = Files.createFile(new File(logDir, 
"jobmanager.log").toPath());
                        Files.createFile(new File(logDir, 
"jobmanager.out").toPath());
 
-                       config.setInteger(JobManagerOptions.WEB_PORT, 0);
-                       config.setString(JobManagerOptions.WEB_LOG_PATH, 
logFile.toString());
+                       config.setInteger(WebOptions.PORT, 0);
+                       config.setString(WebOptions.LOG_PATH, 
logFile.toString());
 
                        highAvailabilityServices = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
                                config,
@@ -168,7 +168,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                        String[] jobManagerAddress = new String[2];
                        for (int i = 0; i < jobManager.length; i++) {
                                Configuration jmConfig = config.clone();
-                               jmConfig.setInteger(JobManagerOptions.WEB_PORT,
+                               jmConfig.setInteger(WebOptions.PORT,
                                                webMonitor[i].getServerPort());
 
                                jobManager[i] = 
JobManager.startJobManagerActors(
@@ -294,8 +294,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                        Files.createFile(new File(logDir, 
"jobmanager.out").toPath());
 
                        final Configuration config = new Configuration();
-                       config.setInteger(JobManagerOptions.WEB_PORT, 0);
-                       config.setString(JobManagerOptions.WEB_LOG_PATH, 
logFile.toString());
+                       config.setInteger(WebOptions.PORT, 0);
+                       config.setString(WebOptions.LOG_PATH, 
logFile.toString());
                        config.setString(HighAvailabilityOptions.HA_MODE, 
"ZOOKEEPER");
                        
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zooKeeper.getConnectString());
 
@@ -473,8 +473,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 
                // Web frontend on random port
                Configuration config = new Configuration();
-               config.setInteger(JobManagerOptions.WEB_PORT, 0);
-               config.setString(JobManagerOptions.WEB_LOG_PATH, 
logFile.toString());
+               config.setInteger(WebOptions.PORT, 0);
+               config.setString(WebOptions.LOG_PATH, logFile.toString());
 
                HighAvailabilityServices highAvailabilityServices = 
flink.highAvailabilityServices();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 4cf8166..f204393 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -30,6 +30,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
@@ -188,7 +189,7 @@ public class BootstrapTools {
                config.setString(JobManagerOptions.ADDRESS, 
address.host().get());
                config.setInteger(JobManagerOptions.PORT, 
Integer.parseInt(address.port().get().toString()));
 
-               if (config.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 
0) {
+               if (config.getInteger(WebOptions.PORT, 0) >= 0) {
                        logger.info("Starting JobManager Web Frontend");
 
                        // start the web frontend. we need to load this 
dynamically

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 3885e8d..b79503a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -208,7 +208,7 @@ public class ExecutionGraphBuilder {
                        }
 
                        // Maximum number of remembered checkpoints
-                       int historySize = 
jobManagerConfig.getInteger(JobManagerOptions.WEB_CHECKPOINTS_HISTORY_SIZE);
+                       int historySize = 
jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
 
                        CheckpointStatsTracker checkpointStatsTracker = new 
CheckpointStatsTracker(
                                        historySize,

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 12c2d8e..9ebb126 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -79,14 +79,14 @@ public final class WebMonitorUtils {
 
                        if (logFilePath == null) {
                                LOG.warn("Log file environment variable '{}' is 
not set.", logEnv);
-                               logFilePath = 
config.getString(JobManagerOptions.WEB_LOG_PATH);
+                               logFilePath = 
config.getString(WebOptions.LOG_PATH);
                        }
 
                        // not configured, cannot serve log files
                        if (logFilePath == null || logFilePath.length() < 4) {
                                LOG.warn("JobManager log files are unavailable 
in the web dashboard. " +
                                        "Log file location not found in 
environment variable '{}' or configuration key '{}'.",
-                                       logEnv, 
JobManagerOptions.WEB_LOG_PATH.key());
+                                       logEnv, WebOptions.LOG_PATH);
                                return new LogFileLocation(null, null);
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 7adf456..e490b48 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -170,7 +170,7 @@ class JobManager(
    * to run in the actor system of the associated job manager.
    */
   val webMonitorPort : Int = flinkConfiguration.getInteger(
-    JobManagerOptions.WEB_PORT.key(), -1)
+    WebOptions.PORT, -1)
 
   /** The default directory for savepoints. */
   val defaultSavepointDir: String = 
flinkConfiguration.getString(CoreOptions.SAVEPOINT_DIRECTORY)
@@ -2216,7 +2216,7 @@ object JobManager {
     : (ActorRef, ActorRef, Option[WebMonitor], Option[ActorRef]) = {
 
     val webMonitor: Option[WebMonitor] =
-      if (configuration.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 0) {
+      if (configuration.getInteger(WebOptions.PORT, 0) >= 0) {
         LOG.info("Starting JobManager web frontend")
 
         // start the web frontend. we need to load this dynamically
@@ -2234,7 +2234,7 @@ object JobManager {
 
     // Reset the port (necessary in case of automatic port selection)
     webMonitor.foreach{ monitor => configuration.setInteger(
-      JobManagerOptions.WEB_PORT, monitor.getServerPort) }
+      WebOptions.PORT, monitor.getServerPort) }
 
     try {
       // bring up the job manager actor
@@ -2395,7 +2395,7 @@ object JobManager {
     }
 
     if (cliOptions.getWebUIPort() >= 0) {
-      configuration.setInteger(JobManagerOptions.WEB_PORT, 
cliOptions.getWebUIPort())
+      configuration.setInteger(WebOptions.PORT, cliOptions.getWebUIPort())
     }
 
     if (cliOptions.getHost() != null) {
@@ -2474,7 +2474,7 @@ object JobManager {
 
     val restartStrategy = 
RestartStrategyFactory.createRestartStrategyFactory(configuration)
 
-    val archiveCount = 
configuration.getInteger(JobManagerOptions.WEB_ARCHIVE_COUNT)
+    val archiveCount = configuration.getInteger(WebOptions.ARCHIVE_COUNT)
 
     val archiveDir = configuration.getString(JobManagerOptions.ARCHIVE_DIR)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 6f13b9f..bc323cc 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -28,7 +28,7 @@ import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.{JobExecutionResult, JobID, 
JobSubmissionResult}
-import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, 
Configuration, JobManagerOptions, ResourceManagerOptions, TaskManagerOptions}
+import org.apache.flink.configuration._
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils}
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
@@ -387,7 +387,7 @@ abstract class FlinkMiniCluster(
     : Option[WebMonitor] = {
     if(
       config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) &&
-        config.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 0) {
+        config.getInteger(WebOptions.PORT, 0) >= 0) {
 
       LOG.info("Starting JobManger web frontend")
       // start the new web frontend. we need to load this dynamically

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index df53d59..38b8431 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -26,7 +26,8 @@ import static 
org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
-import org.apache.flink.configuration.JobManagerOptions;
+
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmaster.JobMaster;
@@ -203,7 +204,7 @@ public class JobManagerProcessReapingTest extends 
TestLogger {
                public static void main(String[] args) {
                        try {
                                Configuration config = new Configuration();
-                               config.setInteger(JobManagerOptions.WEB_PORT, 
-1);
+                               config.setInteger(WebOptions.PORT, -1);
 
                                JobManager.runJobManager(config, 
JobManagerMode.CLUSTER, "localhost", 0);
                                System.exit(0);

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 9435ebf..9af8aaf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 
@@ -66,7 +66,7 @@ public class ZooKeeperTestUtils {
                checkNotNull(fsStateHandlePath, "File state handle backend 
path");
 
                // Web frontend, you have been dismissed. Sorry.
-               config.setInteger(JobManagerOptions.WEB_PORT, -1);
+               config.setInteger(WebOptions.PORT, -1);
 
                // ZooKeeper recovery mode
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index da6d9a8..c0daf1a 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -23,8 +23,8 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -147,8 +147,8 @@ public class TestBaseUtils extends TestLogger {
                config.setString(AkkaOptions.ASK_TIMEOUT, 
DEFAULT_AKKA_ASK_TIMEOUT + "s");
                config.setString(AkkaOptions.STARTUP_TIMEOUT, 
DEFAULT_AKKA_STARTUP_TIMEOUT);
 
-               config.setInteger(JobManagerOptions.WEB_PORT, 8081);
-               config.setString(JobManagerOptions.WEB_LOG_PATH, 
logFile.toString());
+               config.setInteger(WebOptions.PORT, 8081);
+               config.setString(WebOptions.LOG_PATH, logFile.toString());
 
                config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
logFile.toString());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index e951df4..88cc585 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -365,7 +366,7 @@ public class YarnApplicationMasterRunner {
                                LOG);
 
                        String protocol = "http://";;
-                       if 
(config.getBoolean(JobManagerOptions.WEB_SSL_ENABLED) && 
SSLUtils.getSSLEnabled(config)) {
+                       if (config.getBoolean(WebOptions.SSL_ENABLED) && 
SSLUtils.getSSLEnabled(config)) {
                                protocol = "https://";;
                        }
                        final String webMonitorURL = webMonitor == null ? null :
@@ -511,8 +512,8 @@ public class YarnApplicationMasterRunner {
                }
 
                // if a web monitor shall be started, set the port to random 
binding
-               if (configuration.getInteger(JobManagerOptions.WEB_PORT.key(), 
0) >= 0) {
-                       configuration.setInteger(JobManagerOptions.WEB_PORT, 0);
+               if (configuration.getInteger(WebOptions.PORT, 0) >= 0) {
+                       configuration.setInteger(WebOptions.PORT, 0);
                }
 
                // if the user has set the deprecated YARN-specific config 
keys, we add the

http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index e8fccac..d36e769 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -105,8 +106,8 @@ public class YarnEntrypointUtils {
                }
 
                // if a web monitor shall be started, set the port to random 
binding
-               if (configuration.getInteger(JobManagerOptions.WEB_PORT.key(), 
0) >= 0) {
-                       configuration.setInteger(JobManagerOptions.WEB_PORT, 0);
+               if (configuration.getInteger(WebOptions.PORT, 0) >= 0) {
+                       configuration.setInteger(WebOptions.PORT, 0);
                }
 
                // if the user has set the deprecated YARN-specific config 
keys, we add the

Reply via email to