Repository: flink Updated Branches: refs/heads/master d6126e7ca -> c685251ce
[FLINK-7408] [conf] Create WebOptions for WebRuntimeMonitor This commit moves the WebRuntimeMonitor related configuration options from JobManagerOptions to WebOptions. Moreover, it removes the prefix jobmanager. This closes #4512. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c685251c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c685251c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c685251c Branch: refs/heads/master Commit: c685251ce3b39362e45b07a12aa0cc6e7f665021 Parents: d6126e7 Author: Till Rohrmann <[email protected]> Authored: Thu Aug 10 11:42:09 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Fri Aug 11 10:02:42 2017 +0200 ---------------------------------------------------------------------- docs/ops/config.md | 22 +-- .../client/program/StandaloneClusterClient.java | 4 +- .../ConfigOptionsDocGenerator.java | 2 +- .../flink/configuration/JobManagerOptions.java | 111 ------------- .../apache/flink/configuration/WebOptions.java | 156 +++++++++++++++++++ .../runtime/webmonitor/WebMonitorConfig.java | 12 +- .../runtime/webmonitor/WebRuntimeMonitor.java | 20 +-- .../runtime/webmonitor/WebFrontendITCase.java | 4 +- .../webmonitor/WebRuntimeMonitorITCase.java | 16 +- .../clusterframework/BootstrapTools.java | 3 +- .../executiongraph/ExecutionGraphBuilder.java | 4 +- .../runtime/webmonitor/WebMonitorUtils.java | 6 +- .../flink/runtime/jobmanager/JobManager.scala | 10 +- .../runtime/minicluster/FlinkMiniCluster.scala | 4 +- .../JobManagerProcessReapingTest.java | 5 +- .../runtime/testutils/ZooKeeperTestUtils.java | 4 +- .../apache/flink/test/util/TestBaseUtils.java | 6 +- .../flink/yarn/YarnApplicationMasterRunner.java | 7 +- .../yarn/entrypoint/YarnEntrypointUtils.java | 5 +- 19 files changed, 225 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/docs/ops/config.md ---------------------------------------------------------------------- diff --git a/docs/ops/config.md b/docs/ops/config.md index e84bd46..c8d5c92 100644 --- a/docs/ops/config.md +++ b/docs/ops/config.md @@ -367,27 +367,27 @@ These parameters allow for advanced tuning. The default values are sufficient wh - `taskmanager.net.transport`: The Netty transport type, either "nio" or "epoll" (DEFAULT: **nio**). -### JobManager Web Frontend +### Web Frontend -- `jobmanager.web.port`: Port of the JobManager's web interface that displays status of running jobs and execution time breakdowns of finished jobs (DEFAULT: 8081). Setting this value to `-1` disables the web frontend. +- `web.port`: Port of the web interface that displays status of running jobs and execution time breakdowns of finished jobs (DEFAULT: 8081). Setting this value to `-1` disables the web frontend. -- `jobmanager.web.history`: The number of latest jobs that the JobManager's web front-end in its history (DEFAULT: 5). +- `web.history`: The number of latest jobs that the web front-end in its history (DEFAULT: 5). -- `jobmanager.web.checkpoints.disable`: Disables checkpoint statistics (DEFAULT: `false`). +- `web.checkpoints.disable`: Disables checkpoint statistics (DEFAULT: `false`). -- `jobmanager.web.checkpoints.history`: Number of checkpoint statistics to remember (DEFAULT: `10`). +- `web.checkpoints.history`: Number of checkpoint statistics to remember (DEFAULT: `10`). -- `jobmanager.web.backpressure.cleanup-interval`: Time after which cached stats are cleaned up if not accessed (DEFAULT: `600000`, 10 mins). +- `web.backpressure.cleanup-interval`: Time after which cached stats are cleaned up if not accessed (DEFAULT: `600000`, 10 mins). -- `jobmanager.web.backpressure.refresh-interval`: Time after which available stats are deprecated and need to be refreshed (DEFAULT: `60000`, 1 min). +- `web.backpressure.refresh-interval`: Time after which available stats are deprecated and need to be refreshed (DEFAULT: `60000`, 1 min). -- `jobmanager.web.backpressure.num-samples`: Number of stack trace samples to take to determine back pressure (DEFAULT: `100`). +- `web.backpressure.num-samples`: Number of stack trace samples to take to determine back pressure (DEFAULT: `100`). -- `jobmanager.web.backpressure.delay-between-samples`: Delay between stack trace samples to determine back pressure (DEFAULT: `50`, 50 ms). +- `web.backpressure.delay-between-samples`: Delay between stack trace samples to determine back pressure (DEFAULT: `50`, 50 ms). -- `jobmanager.web.ssl.enabled`: Enable https access to the web frontend. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: `true`). +- `web.ssl.enabled`: Enable https access to the web frontend. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: `true`). -- `jobmanager.web.access-control-allow-origin`: Enable custom access control parameter for allow origin header, default is `*`. +- `web.access-control-allow-origin`: Enable custom access control parameter for allow origin header, default is `*`. ### File Systems http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java index 19a365e..10e1bdd 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java @@ -20,7 +20,7 @@ package org.apache.flink.client.program; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -54,7 +54,7 @@ public class StandaloneClusterClient extends ClusterClient { @Override public String getWebInterfaceURL() { String host = getJobManagerAddress().getHostString(); - int port = getFlinkConfiguration().getInteger(JobManagerOptions.WEB_PORT); + int port = getFlinkConfiguration().getInteger(WebOptions.PORT); return "http://" + host + ":" + port; } http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java index b112843..1769c35 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java @@ -156,7 +156,7 @@ public class ConfigOptionsDocGenerator { // This is a temporary hack that should be removed once FLINK-6490 is resolved. // These options use System.getProperty("java.io.tmpdir") as the default. // As a result the generated table contains an actual path as the default, which is simply wrong. - if (option == JobManagerOptions.WEB_TMP_DIR || option.key().equals("python.dc.tmp.dir")) { + if (option == WebOptions.TMP_DIR || option.key().equals("python.dc.tmp.dir")) { defaultValue = null; } return "<tr>" + http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index 76b6bed..ef3306e 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -89,117 +89,6 @@ public class JobManagerOptions { key("jobmanager.resourcemanager.reconnect-interval") .defaultValue(2000L); - // ------------------------------------------------------------------------ - // JobManager web UI - // ------------------------------------------------------------------------ - - /** - * Config parameter defining the runtime monitor web-frontend server address. - */ - public static final ConfigOption<String> WEB_FRONTEND_ADDRESS = - key("jobmanager.web.address") - .noDefaultValue(); - - /** - * The port for the runtime monitor web-frontend server. - */ - public static final ConfigOption<Integer> WEB_PORT = - key("jobmanager.web.port") - .defaultValue(8081); - - /** - * The config parameter defining the Access-Control-Allow-Origin header for all - * responses from the web-frontend. - */ - public static final ConfigOption<String> WEB_ACCESS_CONTROL_ALLOW_ORIGIN = - key("jobmanager.web.access-control-allow-origin") - .defaultValue("*"); - - /** - * The config parameter defining the refresh interval for the web-frontend. - */ - public static final ConfigOption<Long> WEB_REFRESH_INTERVAL = - key("jobmanager.web.refresh-interval") - .defaultValue(3000L); - - /** - * Config parameter to override SSL support for the JobManager Web UI - */ - public static final ConfigOption<Boolean> WEB_SSL_ENABLED = - key("jobmanager.web.ssl.enabled") - .defaultValue(true); - - /** - * The config parameter defining the flink web directory to be used by the webmonitor. - */ - public static final ConfigOption<String> WEB_TMP_DIR = - key("jobmanager.web.tmpdir") - .defaultValue(System.getProperty("java.io.tmpdir")); - - /** - * The config parameter defining the directory for uploading the job jars. If not specified a dynamic directory - * will be used under the directory specified by JOB_MANAGER_WEB_TMPDIR_KEY. - */ - public static final ConfigOption<String> WEB_UPLOAD_DIR = - key("jobmanager.web.upload.dir") - .noDefaultValue(); - - /** - * The config parameter defining the number of archived jobs for the jobmanager. - */ - public static final ConfigOption<Integer> WEB_ARCHIVE_COUNT = - key("jobmanager.web.history") - .defaultValue(5); - - /** - * The log file location (may be in /log for standalone but under log directory when using YARN). - */ - public static final ConfigOption<String> WEB_LOG_PATH = - key("jobmanager.web.log.path") - .noDefaultValue(); - - /** - * Config parameter indicating whether jobs can be uploaded and run from the web-frontend. - */ - public static final ConfigOption<Boolean> WEB_SUBMIT_ENABLE = - key("jobmanager.web.submit.enable") - .defaultValue(true); - - /** - * Config parameter defining the number of checkpoints to remember for recent history. - */ - public static final ConfigOption<Integer> WEB_CHECKPOINTS_HISTORY_SIZE = - key("jobmanager.web.checkpoints.history") - .defaultValue(10); - - /** - * Time after which cached stats are cleaned up if not accessed. - */ - public static final ConfigOption<Integer> WEB_BACKPRESSURE_CLEANUP_INTERVAL = - key("jobmanager.web.backpressure.cleanup-interval") - .defaultValue(10 * 60 * 1000); - - /** - * Time after which available stats are deprecated and need to be refreshed (by resampling). - */ - public static final ConfigOption<Integer> WEB_BACKPRESSURE_REFRESH_INTERVAL = - key("jobmanager.web.backpressure.refresh-interval") - .defaultValue(60 * 1000); - - /** - * Number of stack trace samples to take to determine back pressure. - */ - public static final ConfigOption<Integer> WEB_BACKPRESSURE_NUM_SAMPLES = - key("jobmanager.web.backpressure.num-samples") - .defaultValue(100); - - /** - * Delay between stack trace samples to determine back pressure. - */ - public static final ConfigOption<Integer> WEB_BACKPRESSURE_DELAY = - key("jobmanager.web.backpressure.delay-between-samples") - .defaultValue(50); - /** * The location where the JobManager stores the archives of completed jobs. */ http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java new file mode 100644 index 0000000..f499045 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Configuration options for the WebRuntimeMonitor. + */ +@PublicEvolving +public class WebOptions { + /** + * Config parameter defining the runtime monitor web-frontend server address. + */ + public static final ConfigOption<String> ADDRESS = + key("web.address") + .noDefaultValue() + .withDeprecatedKeys("jobmanager.web.address"); + + /** + * The port for the runtime monitor web-frontend server. + */ + public static final ConfigOption<Integer> PORT = + key("web.port") + .defaultValue(8081) + .withDeprecatedKeys("jobmanager.web.port"); + + /** + * The config parameter defining the Access-Control-Allow-Origin header for all + * responses from the web-frontend. + */ + public static final ConfigOption<String> ACCESS_CONTROL_ALLOW_ORIGIN = + key("web.access-control-allow-origin") + .defaultValue("*") + .withDeprecatedKeys("jobmanager.web.access-control-allow-origin"); + + /** + * The config parameter defining the refresh interval for the web-frontend. + */ + public static final ConfigOption<Long> REFRESH_INTERVAL = + key("web.refresh-interval") + .defaultValue(3000L) + .withDeprecatedKeys("jobmanager.web.refresh-interval"); + + /** + * Config parameter to override SSL support for the JobManager Web UI + */ + public static final ConfigOption<Boolean> SSL_ENABLED = + key("web.ssl.enabled") + .defaultValue(true) + .withDeprecatedKeys("jobmanager.web.ssl.enabled"); + + /** + * The config parameter defining the flink web directory to be used by the webmonitor. + */ + public static final ConfigOption<String> TMP_DIR = + key("web.tmpdir") + .defaultValue(System.getProperty("java.io.tmpdir")) + .withDeprecatedKeys("jobmanager.web.tmpdir"); + + /** + * The config parameter defining the directory for uploading the job jars. If not specified a dynamic directory + * will be used under the directory specified by JOB_MANAGER_WEB_TMPDIR_KEY. + */ + public static final ConfigOption<String> UPLOAD_DIR = + key("web.upload.dir") + .noDefaultValue() + .withDeprecatedKeys("jobmanager.web.upload.dir"); + + /** + * The config parameter defining the number of archived jobs for the jobmanager. + */ + public static final ConfigOption<Integer> ARCHIVE_COUNT = + key("web.history") + .defaultValue(5) + .withDeprecatedKeys("jobmanager.web.history"); + + /** + * The log file location (may be in /log for standalone but under log directory when using YARN). + */ + public static final ConfigOption<String> LOG_PATH = + key("web.log.path") + .noDefaultValue() + .withDeprecatedKeys("jobmanager.web.log.path"); + + /** + * Config parameter indicating whether jobs can be uploaded and run from the web-frontend. + */ + public static final ConfigOption<Boolean> SUBMIT_ENABLE = + key("web.submit.enable") + .defaultValue(true) + .withDeprecatedKeys("jobmanager.web.submit.enable"); + + /** + * Config parameter defining the number of checkpoints to remember for recent history. + */ + public static final ConfigOption<Integer> CHECKPOINTS_HISTORY_SIZE = + key("web.checkpoints.history") + .defaultValue(10) + .withDeprecatedKeys("jobmanager.web.checkpoints.history"); + + /** + * Time after which cached stats are cleaned up if not accessed. + */ + public static final ConfigOption<Integer> BACKPRESSURE_CLEANUP_INTERVAL = + key("web.backpressure.cleanup-interval") + .defaultValue(10 * 60 * 1000) + .withDeprecatedKeys("jobmanager.web.backpressure.cleanup-interval"); + + /** + * Time after which available stats are deprecated and need to be refreshed (by resampling). + */ + public static final ConfigOption<Integer> BACKPRESSURE_REFRESH_INTERVAL = + key("web.backpressure.refresh-interval") + .defaultValue(60 * 1000) + .withDeprecatedKeys("jobmanager.web.backpressure.refresh-interval"); + + /** + * Number of stack trace samples to take to determine back pressure. + */ + public static final ConfigOption<Integer> BACKPRESSURE_NUM_SAMPLES = + key("web.backpressure.num-samples") + .defaultValue(100) + .withDeprecatedKeys("jobmanager.web.backpressure.num-samples"); + + /** + * Delay between stack trace samples to determine back pressure. + */ + public static final ConfigOption<Integer> BACKPRESSURE_DELAY = + key("web.backpressure.delay-between-samples") + .defaultValue(50) + .withDeprecatedKeys("jobmanager.web.backpressure.delay-between-samples"); + + + private WebOptions() { + throw new IllegalAccessError(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java index 84ca049..878bfb0 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.WebOptions; /** * Configuration object for {@link WebMonitor}. @@ -37,22 +37,22 @@ public class WebMonitorConfig { } public String getWebFrontendAddress() { - return config.getValue(JobManagerOptions.WEB_FRONTEND_ADDRESS); + return config.getValue(WebOptions.ADDRESS); } public int getWebFrontendPort() { - return config.getInteger(JobManagerOptions.WEB_PORT); + return config.getInteger(WebOptions.PORT); } public long getRefreshInterval() { - return config.getLong(JobManagerOptions.WEB_REFRESH_INTERVAL); + return config.getLong(WebOptions.REFRESH_INTERVAL); } public boolean isProgramSubmitEnabled() { - return config.getBoolean(JobManagerOptions.WEB_SUBMIT_ENABLE); + return config.getBoolean(WebOptions.SUBMIT_ENABLE); } public String getAllowOrigin() { - return config.getString(JobManagerOptions.WEB_ACCESS_CONTROL_ALLOW_ORIGIN); + return config.getString(WebOptions.ACCESS_CONTROL_ALLOW_ORIGIN); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index cffd23b..e27a15f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobView; import org.apache.flink.runtime.jobmanager.MemoryArchivist; @@ -194,13 +194,13 @@ public class WebRuntimeMonitor implements WebMonitor { stackTraceSamples = new StackTraceSampleCoordinator(actorSystem.dispatcher(), 60000); // Back pressure stats tracker config - int cleanUpInterval = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_CLEANUP_INTERVAL); + int cleanUpInterval = config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL); - int refreshInterval = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_REFRESH_INTERVAL); + int refreshInterval = config.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL); - int numSamples = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_NUM_SAMPLES); + int numSamples = config.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES); - int delay = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_DELAY); + int delay = config.getInteger(WebOptions.BACKPRESSURE_DELAY); Time delayBetweenSamples = Time.milliseconds(delay); @@ -214,7 +214,7 @@ public class WebRuntimeMonitor implements WebMonitor { ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(executorService); // Config to enable https access to the web-ui - boolean enableSSL = config.getBoolean(JobManagerOptions.WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config); + boolean enableSSL = config.getBoolean(WebOptions.SSL_ENABLED) && SSLUtils.getSSLEnabled(config); if (enableSSL) { LOG.info("Enabling ssl for the web frontend"); @@ -318,7 +318,7 @@ public class WebRuntimeMonitor implements WebMonitor { // DELETE is the preferred way of stopping a job (Rest-conform) delete(router, new JobStoppingHandler()); - int maxCachedEntries = config.getInteger(JobManagerOptions.WEB_CHECKPOINTS_HISTORY_SIZE); + int maxCachedEntries = config.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE); CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries); // Register the checkpoint stats handlers @@ -531,14 +531,14 @@ public class WebRuntimeMonitor implements WebMonitor { } private String getBaseDirStr(Configuration configuration) { - return configuration.getString(JobManagerOptions.WEB_TMP_DIR); + return configuration.getString(WebOptions.TMP_DIR); } private File getUploadDir(Configuration configuration) { - File baseDir = new File(configuration.getString(JobManagerOptions.WEB_UPLOAD_DIR, + File baseDir = new File(configuration.getString(WebOptions.UPLOAD_DIR, getBaseDirStr(configuration))); - boolean uploadDirSpecified = configuration.contains(JobManagerOptions.WEB_UPLOAD_DIR); + boolean uploadDirSpecified = configuration.contains(WebOptions.UPLOAD_DIR); return uploadDirSpecified ? baseDir : new File(baseDir, "flink-web-" + UUID.randomUUID()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java index acf0e3b..7cd2932 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java @@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; @@ -87,7 +87,7 @@ public class WebFrontendITCase extends TestLogger { Files.createFile(logFile.toPath()); Files.createFile(outFile.toPath()); - config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.getAbsolutePath()); + config.setString(WebOptions.LOG_PATH, logFile.getAbsolutePath()); config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath()); cluster = new LocalFlinkMiniCluster(config, false); http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java index 8c4d8c3..fe16445 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobView; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -144,8 +144,8 @@ public class WebRuntimeMonitorITCase extends TestLogger { Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath()); Files.createFile(new File(logDir, "jobmanager.out").toPath()); - config.setInteger(JobManagerOptions.WEB_PORT, 0); - config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString()); + config.setInteger(WebOptions.PORT, 0); + config.setString(WebOptions.LOG_PATH, logFile.toString()); highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( config, @@ -168,7 +168,7 @@ public class WebRuntimeMonitorITCase extends TestLogger { String[] jobManagerAddress = new String[2]; for (int i = 0; i < jobManager.length; i++) { Configuration jmConfig = config.clone(); - jmConfig.setInteger(JobManagerOptions.WEB_PORT, + jmConfig.setInteger(WebOptions.PORT, webMonitor[i].getServerPort()); jobManager[i] = JobManager.startJobManagerActors( @@ -294,8 +294,8 @@ public class WebRuntimeMonitorITCase extends TestLogger { Files.createFile(new File(logDir, "jobmanager.out").toPath()); final Configuration config = new Configuration(); - config.setInteger(JobManagerOptions.WEB_PORT, 0); - config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString()); + config.setInteger(WebOptions.PORT, 0); + config.setString(WebOptions.LOG_PATH, logFile.toString()); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeper.getConnectString()); @@ -473,8 +473,8 @@ public class WebRuntimeMonitorITCase extends TestLogger { // Web frontend on random port Configuration config = new Configuration(); - config.setInteger(JobManagerOptions.WEB_PORT, 0); - config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString()); + config.setInteger(WebOptions.PORT, 0); + config.setString(WebOptions.LOG_PATH, logFile.toString()); HighAvailabilityServices highAvailabilityServices = flink.highAvailabilityServices(); http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 4cf8166..f204393 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -30,6 +30,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.webmonitor.WebMonitor; @@ -188,7 +189,7 @@ public class BootstrapTools { config.setString(JobManagerOptions.ADDRESS, address.host().get()); config.setInteger(JobManagerOptions.PORT, Integer.parseInt(address.port().get().toString())); - if (config.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 0) { + if (config.getInteger(WebOptions.PORT, 0) >= 0) { logger.info("Starting JobManager Web Frontend"); // start the web frontend. we need to load this dynamically http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index 3885e8d..b79503a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.WebOptions; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; @@ -208,7 +208,7 @@ public class ExecutionGraphBuilder { } // Maximum number of remembered checkpoints - int historySize = jobManagerConfig.getInteger(JobManagerOptions.WEB_CHECKPOINTS_HISTORY_SIZE); + int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE); CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker( historySize, http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index 12c2d8e..9ebb126 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobView; import org.apache.flink.runtime.execution.ExecutionState; @@ -79,14 +79,14 @@ public final class WebMonitorUtils { if (logFilePath == null) { LOG.warn("Log file environment variable '{}' is not set.", logEnv); - logFilePath = config.getString(JobManagerOptions.WEB_LOG_PATH); + logFilePath = config.getString(WebOptions.LOG_PATH); } // not configured, cannot serve log files if (logFilePath == null || logFilePath.length() < 4) { LOG.warn("JobManager log files are unavailable in the web dashboard. " + "Log file location not found in environment variable '{}' or configuration key '{}'.", - logEnv, JobManagerOptions.WEB_LOG_PATH.key()); + logEnv, WebOptions.LOG_PATH); return new LogFileLocation(null, null); } http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 7adf456..e490b48 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -170,7 +170,7 @@ class JobManager( * to run in the actor system of the associated job manager. */ val webMonitorPort : Int = flinkConfiguration.getInteger( - JobManagerOptions.WEB_PORT.key(), -1) + WebOptions.PORT, -1) /** The default directory for savepoints. */ val defaultSavepointDir: String = flinkConfiguration.getString(CoreOptions.SAVEPOINT_DIRECTORY) @@ -2216,7 +2216,7 @@ object JobManager { : (ActorRef, ActorRef, Option[WebMonitor], Option[ActorRef]) = { val webMonitor: Option[WebMonitor] = - if (configuration.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 0) { + if (configuration.getInteger(WebOptions.PORT, 0) >= 0) { LOG.info("Starting JobManager web frontend") // start the web frontend. we need to load this dynamically @@ -2234,7 +2234,7 @@ object JobManager { // Reset the port (necessary in case of automatic port selection) webMonitor.foreach{ monitor => configuration.setInteger( - JobManagerOptions.WEB_PORT, monitor.getServerPort) } + WebOptions.PORT, monitor.getServerPort) } try { // bring up the job manager actor @@ -2395,7 +2395,7 @@ object JobManager { } if (cliOptions.getWebUIPort() >= 0) { - configuration.setInteger(JobManagerOptions.WEB_PORT, cliOptions.getWebUIPort()) + configuration.setInteger(WebOptions.PORT, cliOptions.getWebUIPort()) } if (cliOptions.getHost() != null) { @@ -2474,7 +2474,7 @@ object JobManager { val restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration) - val archiveCount = configuration.getInteger(JobManagerOptions.WEB_ARCHIVE_COUNT) + val archiveCount = configuration.getInteger(WebOptions.ARCHIVE_COUNT) val archiveDir = configuration.getString(JobManagerOptions.ARCHIVE_DIR) http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 6f13b9f..bc323cc 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -28,7 +28,7 @@ import akka.actor.{ActorRef, ActorSystem} import com.typesafe.config.Config import org.apache.flink.api.common.time.Time import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult} -import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions, ResourceManagerOptions, TaskManagerOptions} +import org.apache.flink.configuration._ import org.apache.flink.core.fs.Path import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils} import org.apache.flink.runtime.client.{JobClient, JobExecutionException} @@ -387,7 +387,7 @@ abstract class FlinkMiniCluster( : Option[WebMonitor] = { if( config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) && - config.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 0) { + config.getInteger(WebOptions.PORT, 0) >= 0) { LOG.info("Starting JobManger web frontend") // start the new web frontend. we need to load this dynamically http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java index df53d59..38b8431 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java @@ -26,7 +26,8 @@ import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; -import org.apache.flink.configuration.JobManagerOptions; + +import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobmaster.JobMaster; @@ -203,7 +204,7 @@ public class JobManagerProcessReapingTest extends TestLogger { public static void main(String[] args) { try { Configuration config = new Configuration(); - config.setInteger(JobManagerOptions.WEB_PORT, -1); + config.setInteger(WebOptions.PORT, -1); JobManager.runJobManager(config, JobManagerMode.CLUSTER, "localhost", 0); System.exit(0); http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java index 9435ebf..9af8aaf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java @@ -22,7 +22,7 @@ import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; @@ -66,7 +66,7 @@ public class ZooKeeperTestUtils { checkNotNull(fsStateHandlePath, "File state handle backend path"); // Web frontend, you have been dismissed. Sorry. - config.setInteger(JobManagerOptions.WEB_PORT, -1); + config.setInteger(WebOptions.PORT, -1); // ZooKeeper recovery mode config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index da6d9a8..c0daf1a 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -23,8 +23,8 @@ import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; @@ -147,8 +147,8 @@ public class TestBaseUtils extends TestLogger { config.setString(AkkaOptions.ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s"); config.setString(AkkaOptions.STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT); - config.setInteger(JobManagerOptions.WEB_PORT, 8081); - config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString()); + config.setInteger(WebOptions.PORT, 8081); + config.setString(WebOptions.LOG_PATH, logFile.toString()); config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString()); http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index e951df4..88cc585 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; @@ -365,7 +366,7 @@ public class YarnApplicationMasterRunner { LOG); String protocol = "http://"; - if (config.getBoolean(JobManagerOptions.WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + if (config.getBoolean(WebOptions.SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { protocol = "https://"; } final String webMonitorURL = webMonitor == null ? null : @@ -511,8 +512,8 @@ public class YarnApplicationMasterRunner { } // if a web monitor shall be started, set the port to random binding - if (configuration.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 0) { - configuration.setInteger(JobManagerOptions.WEB_PORT, 0); + if (configuration.getInteger(WebOptions.PORT, 0) >= 0) { + configuration.setInteger(WebOptions.PORT, 0); } // if the user has set the deprecated YARN-specific config keys, we add the http://git-wip-us.apache.org/repos/asf/flink/blob/c685251c/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java index e8fccac..d36e769 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.security.SecurityUtils; @@ -105,8 +106,8 @@ public class YarnEntrypointUtils { } // if a web monitor shall be started, set the port to random binding - if (configuration.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 0) { - configuration.setInteger(JobManagerOptions.WEB_PORT, 0); + if (configuration.getInteger(WebOptions.PORT, 0) >= 0) { + configuration.setInteger(WebOptions.PORT, 0); } // if the user has set the deprecated YARN-specific config keys, we add the
