[FLINK-6144] [config] Port JobManager configuration options to ConfigOption

This PR ports the existing JobManager configuration options to the 
JobManagerOptions class
using the ConfigOption abstraction.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2ca1295
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2ca1295
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2ca1295

Branch: refs/heads/master
Commit: e2ca12957435e9ac3d811ebec611c4465525dea9
Parents: 0236992
Author: Till Rohrmann <[email protected]>
Authored: Tue Mar 21 17:07:26 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Mar 21 21:37:21 2017 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  66 ++++++++-
 .../flink/configuration/JobManagerOptions.java  | 142 +++++++++++++++++++
 .../executiongraph/ExecutionJobVertex.java      |   2 +-
 .../runtime/executiongraph/ExecutionVertex.java |   2 +-
 .../runtime/jobmanager/JobManagerOptions.java   |  38 -----
 5 files changed, 204 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e2ca1295/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index c7c8b1a..318c7e0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -117,13 +117,19 @@ public final class ConfigConstants {
        /**
         * The config parameter defining the network address to connect to
         * for communication with the job manager.
+        *
+        * @deprecated Use {@link JobManagerOptions#ADDRESS} instead
         */
+       @Deprecated
        public static final String JOB_MANAGER_IPC_ADDRESS_KEY = 
"jobmanager.rpc.address";
 
        /**
         * The config parameter defining the network port to connect to
         * for communication with the job manager.
+        *
+        * @deprecated Use {@link JobManagerOptions#PORT} instead
         */
+       @Deprecated
        public static final String JOB_MANAGER_IPC_PORT_KEY = 
"jobmanager.rpc.port";
 
        /**
@@ -570,36 +576,59 @@ public final class ConfigConstants {
 
        /**
         * The port for the runtime monitor web-frontend server.
+        *
+        * @deprecated Use {@link JobManagerOptions#WEB_PORT} instead.
         */
+       @Deprecated
        public static final String JOB_MANAGER_WEB_PORT_KEY = 
"jobmanager.web.port";
 
        /**
         * Config parameter to override SSL support for the JobManager Web UI
+        *
+        * @deprecated Use {@link JobManagerOptions#WEB_SSL_ENABLED} instead.
         */
+       @Deprecated
        public static final String JOB_MANAGER_WEB_SSL_ENABLED = 
"jobmanager.web.ssl.enabled";
 
        /**
         * The config parameter defining the flink web directory to be used by 
the webmonitor.
+        *
+        * @deprecated Use {@link JobManagerOptions#WEB_TMP_DIR} instead.
         */
+       @Deprecated
        public static final String JOB_MANAGER_WEB_TMPDIR_KEY = 
"jobmanager.web.tmpdir";
 
        /**
         * The config parameter defining the directory for uploading the job 
jars. If not specified a dynamic directory
         * will be used under the directory specified by 
JOB_MANAGER_WEB_TMPDIR_KEY.
+        *
+        * @deprecated Use {@link JobManagerOptions#WEB_UPLOAD_DIR} instead.
         */
+       @Deprecated
        public static final String JOB_MANAGER_WEB_UPLOAD_DIR_KEY = 
"jobmanager.web.upload.dir";
 
        /**
         * The config parameter defining the number of archived jobs for the 
jobmanager
+        *
+        * @deprecated Use {@link JobManagerOptions#WEB_ARCHIVE_COUNT} instead.
         */
+       @Deprecated
        public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = 
"jobmanager.web.history";
 
        /**
         * The log file location (may be in /log for standalone but under log 
directory when using YARN)
+        *
+        * @deprecated Use {@link JobManagerOptions#WEB_LOG_PATH} instead.
         */
+       @Deprecated
        public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = 
"jobmanager.web.log.path";
 
-       /** Config parameter indicating whether jobs can be uploaded and run 
from the web-frontend. */
+       /**
+        * Config parameter indicating whether jobs can be uploaded and run 
from the web-frontend.
+        *
+        * @deprecated Use {@link JobManagerOptions#WEB_SUBMIT_ENABLE} instead.
+        */
+       @Deprecated
        public static final String JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY = 
"jobmanager.web.submit.enable";
 
        /**
@@ -610,19 +639,44 @@ public final class ConfigConstants {
        @Deprecated
        public static final String JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = 
"jobmanager.web.checkpoints.disable";
 
-       /** Config parameter defining the number of checkpoints to remember for 
recent history. */
+       /**
+        * Config parameter defining the number of checkpoints to remember for 
recent history.
+        *
+        * @deprecated Use {@link 
JobManagerOptions#WEB_CHECKPOINTS_HISTORY_SIZE} instead.
+        */
+       @Deprecated
        public static final String JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = 
"jobmanager.web.checkpoints.history";
 
-       /** Time after which cached stats are cleaned up if not accessed. */
+       /**
+        * Time after which cached stats are cleaned up if not accessed.
+        *
+        * @deprecated Use {@link 
JobManagerOptions#WEB_BACKPRESSURE_CLEANUP_INTERVAL} instead.
+        */
+       @Deprecated
        public static final String 
JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = 
"jobmanager.web.backpressure.cleanup-interval";
 
-       /** Time after which available stats are deprecated and need to be 
refreshed (by resampling). */
+       /**
+        * Time after which available stats are deprecated and need to be 
refreshed (by resampling).
+        *
+        * @deprecated Use {@link 
JobManagerOptions#WEB_BACKPRESSURE_REFRESH_INTERVAL} instead.
+        */
+       @Deprecated
        public static final String 
JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = 
"jobmanager.web.backpressure.refresh-interval";
 
-       /** Number of stack trace samples to take to determine back pressure. */
+       /**
+        * Number of stack trace samples to take to determine back pressure.
+        *
+        * @deprecated Use {@link 
JobManagerOptions#WEB_BACKPRESSURE_NUM_SAMPLES} instead.
+        */
+       @Deprecated
        public static final String JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = 
"jobmanager.web.backpressure.num-samples";
 
-       /** Delay between stack trace samples to determine back pressure. */
+       /**
+        * Delay between stack trace samples to determine back pressure.
+        *
+        * @deprecated Use {@link JobManagerOptions#WEB_BACKPRESSURE_DELAY} 
instead.
+        */
+       @Deprecated
        public static final String JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = 
"jobmanager.web.backpressure.delay-between-samples";
 
        // ------------------------------ AKKA 
------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e2ca1295/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
new file mode 100644
index 0000000..2bc2498
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Configuration options for the JobManager.
+ */
+@PublicEvolving
+public class JobManagerOptions {
+
+       /**
+        * The config parameter defining the network address to connect to
+        * for communication with the job manager.
+        */
+       public static final ConfigOption<String> ADDRESS = ConfigOptions
+               .key("jobmanager.rpc.address")
+               .noDefaultValue();
+
+       /**
+        * The config parameter defining the network port to connect to
+        * for communication with the job manager.
+        */
+       public static final ConfigOption<Integer> PORT = ConfigOptions
+               .key("jobmanager.rpc.port")
+               .defaultValue(6123);
+
+       /**
+        * The port for the runtime monitor web-frontend server.
+        */
+       public static final ConfigOption<Integer> WEB_PORT = ConfigOptions
+               .key("jobmanager.web.port")
+               .defaultValue(8081);
+
+       /**
+        * Config parameter to override SSL support for the JobManager Web UI
+        */
+       public static final ConfigOption<Boolean> WEB_SSL_ENABLED = 
ConfigOptions
+               .key("jobmanager.web.ssl.enabled")
+               .defaultValue(true);
+
+       /**
+        * The config parameter defining the flink web directory to be used by 
the webmonitor.
+        */
+       public static final ConfigOption<String> WEB_TMP_DIR = ConfigOptions
+               .key("jobmanager.web.tmpdir")
+               .defaultValue(System.getProperty("java.io.tmpdir"));
+
+       /**
+        * The config parameter defining the directory for uploading the job 
jars. If not specified a dynamic directory
+        * will be used under the directory specified by 
JOB_MANAGER_WEB_TMPDIR_KEY.
+        */
+       public static final ConfigOption<String> WEB_UPLOAD_DIR = ConfigOptions
+               .key("jobmanager.web.upload.dir")
+               .noDefaultValue();
+
+       /**
+        * The config parameter defining the number of archived jobs for the 
jobmanager.
+        */
+       public static final ConfigOption<Integer> WEB_ARCHIVE_COUNT = 
ConfigOptions
+               .key("jobmanager.web.history")
+               .defaultValue(5);
+
+       /**
+        * The log file location (may be in /log for standalone but under log 
directory when using YARN).
+        */
+       public static final ConfigOption<String> WEB_LOG_PATH = ConfigOptions
+               .key("jobmanager.web.log.path")
+               .noDefaultValue();
+
+       /**
+        * Config parameter indicating whether jobs can be uploaded and run 
from the web-frontend.
+        */
+       public static final ConfigOption<Boolean> WEB_SUBMIT_ENABLE = 
ConfigOptions
+               .key("jobmanager.web.submit.enable")
+               .defaultValue(true);
+
+       /**
+        * Config parameter defining the number of checkpoints to remember for 
recent history.
+        */
+       public static final ConfigOption<Integer> WEB_CHECKPOINTS_HISTORY_SIZE 
= ConfigOptions
+               .key("jobmanager.web.checkpoints.history")
+               .defaultValue(10);
+
+       /**
+        * Time after which cached stats are cleaned up if not accessed.
+        */
+       public static final ConfigOption<Integer> 
WEB_BACKPRESSURE_CLEANUP_INTERVAL = ConfigOptions
+               .key("jobmanager.web.backpressure.cleanup-interval")
+               .defaultValue(10 * 60 * 1000);
+
+       /**
+        * Time after which available stats are deprecated and need to be 
refreshed (by resampling).
+        */
+       public static final ConfigOption<Integer> 
WEB_BACKPRESSURE_REFRESH_INTERVAL = ConfigOptions
+               .key("jobmanager.web.backpressure.refresh-interval")
+               .defaultValue(60 * 1000);
+
+       /**
+        * Number of stack trace samples to take to determine back pressure.
+        */
+       public static final ConfigOption<Integer> WEB_BACKPRESSURE_NUM_SAMPLES 
= ConfigOptions
+               .key("jobmanager.web.backpressure.num-samples")
+               .defaultValue(100);
+
+       /**
+        * Delay between stack trace samples to determine back pressure.
+        */
+       public static final ConfigOption<Integer> WEB_BACKPRESSURE_DELAY = 
ConfigOptions
+               .key("jobmanager.web.backpressure.delay-between-samples")
+               .defaultValue(50);
+
+       /**
+        * The maximum number of prior execution attempts kept in history.
+        */
+       public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE = 
ConfigOptions
+               .key("job-manager.max-attempts-history-size")
+               .defaultValue(16);
+
+       // 
---------------------------------------------------------------------------------------------
+
+       private JobManagerOptions() {
+               throw new IllegalAccessError();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2ca1295/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 545315f..c9b25bf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -40,7 +40,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.JobManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;

http://git-wip-us.apache.org/repos/asf/flink/blob/e2ca1295/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 9693b97..c7829fa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -38,7 +38,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.JobManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.state.TaskStateHandles;

http://git-wip-us.apache.org/repos/asf/flink/blob/e2ca1295/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
deleted file mode 100644
index 279a70e..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.ConfigOption;
-
-import static org.apache.flink.configuration.ConfigOptions.key;
-
-@PublicEvolving
-public class JobManagerOptions {
-
-       /**
-        * The maximum number of prior execution attempts kept in history.
-        */
-       public static final ConfigOption<Integer> MAX_ATTEMPTS_HISTORY_SIZE =
-                       
key("job-manager.max-attempts-history-size").defaultValue(16);
-
-       private JobManagerOptions() {
-               throw new IllegalAccessError();
-       }
-}

Reply via email to