[FLINK-4770] [core] Introduce 'CoreOptions'

The CoreOptions should hold all essential configuration values that are not 
specific to
JobManager, TaskManager or any feature area, like HighAvailability or Security.

Examples for that are
  - default java options
  - default parallelism
  - default state backend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4047965
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4047965
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4047965

Branch: refs/heads/master
Commit: a4047965898adc0ba7bf74280a3b22792ced3399
Parents: 544f534
Author: Stephan Ewen <[email protected]>
Authored: Fri Feb 17 14:57:20 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Mon Feb 20 21:29:16 2017 +0100

----------------------------------------------------------------------
 .../connectors/fs/RollingSinkSecuredITCase.java |  3 +-
 .../flink/configuration/ConfigConstants.java    | 11 +++++-
 .../apache/flink/configuration/CoreOptions.java | 40 ++++++++++++++++++++
 .../org/apache/flink/hdfstests/HDFSTest.java    |  4 +-
 .../clusterframework/BootstrapTools.java        |  3 +-
 .../flink/runtime/blob/BlobRecoveryITCase.java  |  4 +-
 .../clusterframework/BootstrapToolsTest.java    |  3 +-
 .../BlobLibraryCacheRecoveryITCase.java         |  4 +-
 .../runtime/testutils/ZooKeeperTestUtils.java   |  3 +-
 .../streaming/runtime/tasks/StreamTask.java     |  6 +--
 ...AlignedProcessingTimeWindowOperatorTest.java |  4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  6 +--
 .../test/checkpointing/RescalingITCase.java     |  3 +-
 .../test/checkpointing/SavepointITCase.java     |  5 ++-
 .../utils/SavepointMigrationTestBase.java       |  3 +-
 .../test/classloading/ClassLoaderITCase.java    |  3 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |  3 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |  4 +-
 .../flink/yarn/YarnClusterDescriptorTest.java   |  3 +-
 19 files changed, 86 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index fa46fc7..768ca5e 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.fs;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -216,7 +217,7 @@ public class RollingSinkSecuredITCase extends 
RollingSinkITCase {
                        
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
                        config.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
-                       config.setString(ConfigConstants.STATE_BACKEND, 
"filesystem");
+                       config.setString(CoreOptions.STATE_BACKEND, 
"filesystem");
                        
config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + 
"/flink/checkpoints");
                        
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + 
"/flink/recovery");
                        config.setString("state.backend.fs.checkpointdir", 
hdfsURI + "/flink/checkpoints");

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 44a78f9..5129f20 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -741,8 +741,11 @@ public final class ConfigConstants {
        // ----------------------------- Streaming 
--------------------------------
        
        /**
-        * State backend for checkpoints;
+        * State backend for checkpoints
+        * 
+        * @deprecated Use {@link CoreOptions#STATE_BACKEND} instead.
         */
+       @Deprecated
        public static final String STATE_BACKEND = "state.backend";
        
        // ----------------------------- Miscellaneous 
----------------------------
@@ -756,7 +759,11 @@ public final class ConfigConstants {
         */
        @Deprecated
        public static final String FLINK_BASE_DIR_PATH_KEY = 
"flink.base.dir.path";
-       
+
+       /**
+        * @deprecated Use {@link CoreOptions#FLINK_JVM_OPTIONS} instead.
+        */
+       @Deprecated
        public static final String FLINK_JVM_OPTIONS = "env.java.opts";
 
        // --------------------------- High Availability 
--------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
new file mode 100644
index 0000000..70e5f0b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+@PublicEvolving
+public class CoreOptions {
+
+       /**
+        * 
+        */
+       public static final ConfigOption<String> FLINK_JVM_OPTIONS = 
ConfigOptions
+               .key("env.java.opts")
+               .defaultValue("");
+
+       public static final ConfigOption<Integer> DEFAULT_PARALLELISM_KEY = 
ConfigOptions
+               .key("parallelism.default")
+               .defaultValue(-1);
+       
+       public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
+               .key("state.backend")
+               .noDefaultValue();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index 49db0f8..75e666f 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -230,7 +230,7 @@ public class HDFSTest {
                org.apache.flink.configuration.Configuration
                        config = new 
org.apache.flink.configuration.Configuration();
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-               config.setString(ConfigConstants.STATE_BACKEND, "ZOOKEEPER");
+               config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
hdfsURI);
 
                BlobRecoveryITCase.testBlobServerRecovery(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index a0cf1d5..ebc9af8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -27,6 +27,7 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -358,7 +359,7 @@ public class BootstrapTools {
                        .put("jvmmem",  "-Xms" + 
tmParams.taskManagerHeapSizeMB() + "m " +
                                                        "-Xmx" + 
tmParams.taskManagerHeapSizeMB() + "m " +
                                                        
"-XX:MaxDirectMemorySize=" + tmParams.taskManagerDirectMemoryLimitMB() + "m");
-               String javaOpts = 
flinkConfig.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+               String javaOpts = 
flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS);
                //applicable only for YarnMiniCluster secure test run
                //krb5.conf file will be available as local resource in JM/TM 
container
                if(hasKrb5) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index d043665..a7f792f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -55,7 +55,7 @@ public class BlobRecoveryITCase {
        public void testBlobServerRecovery() throws Exception {
                Configuration config = new Configuration();
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-               config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+               config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.getRoot().getPath());
 
                testBlobServerRecovery(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index b08e1f4..1d100da 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -209,7 +210,7 @@ public class BootstrapToolsTest {
                                        true, true, true, this.getClass()));
 
                // logback + log4j, with/out krb5, different JVM opts
-               cfg.setString(ConfigConstants.FLINK_JVM_OPTIONS, jvmOpts);
+               cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
                assertEquals(
                        java + " " + jvmmem +
                                " " + jvmOpts +

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index d3925be..7f75acc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.execution.librarycache;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
@@ -65,7 +65,7 @@ public class BlobLibraryCacheRecoveryITCase {
 
                Configuration config = new Configuration();
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-               config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+               config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.getRoot().getAbsolutePath());
 
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 07cec32..42338cd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
@@ -80,7 +81,7 @@ public class ZooKeeperTestUtils {
                
config.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 
connTimeout);
 
                // File system state backend
-               config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+               config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
                
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, 
fsStateHandlePath + "/checkpoints");
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
fsStateHandlePath + "/recovery");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6b33d12..d734dc9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -694,7 +694,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                } else {
                        // see if we have a backend specified in the 
configuration
                        Configuration flinkConfig = 
getEnvironment().getTaskManagerInfo().getConfiguration();
-                       String backendName = 
flinkConfig.getString(ConfigConstants.STATE_BACKEND, null);
+                       String backendName = 
flinkConfig.getString(CoreOptions.STATE_BACKEND, null);
 
                        if (backendName == null) {
                                LOG.warn("No state backend has been specified, 
using default state backend (Memory / JobManager)");
@@ -731,7 +731,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                                throw new 
IllegalConfigurationException("Cannot find configured state backend: " + 
backendName);
                                        } catch (ClassCastException e) {
                                                throw new 
IllegalConfigurationException("The class configured under '" +
-                                                               
ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" +
+                                                               
CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" +
                                                                backendName + 
')');
                                        } catch (Throwable t) {
                                                throw new 
IllegalConfigurationException("Cannot create configured state backend", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 508d2e1..6f0e881 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -28,8 +28,8 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.execution.Environment;
 import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -1074,7 +1074,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        private static StreamTask<?, ?> createMockTask() {
                Configuration configuration = new Configuration();
-               configuration.setString(ConfigConstants.STATE_BACKEND, 
"jobmanager");
+               configuration.setString(CoreOptions.STATE_BACKEND, 
"jobmanager");
 
                StreamTask<?, ?> task = mock(StreamTask.class);
                when(task.getAccumulatorMap()).thenReturn(new HashMap<String, 
Accumulator<?, ?>>());

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 6a63ee8..887ea4f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -192,7 +192,7 @@ public class StreamTaskTest extends TestLogger {
        @Test
        public void testStateBackendLoadingAndClosing() throws Exception {
                Configuration taskManagerConfig = new Configuration();
-               taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, 
MockStateBackend.class.getName());
+               taskManagerConfig.setString(CoreOptions.STATE_BACKEND, 
MockStateBackend.class.getName());
 
                StreamConfig cfg = new StreamConfig(new Configuration());
                cfg.setStreamOperator(new StreamSource<>(new 
MockSourceFunction()));
@@ -216,7 +216,7 @@ public class StreamTaskTest extends TestLogger {
        @Test
        public void testStateBackendClosingOnFailure() throws Exception {
                Configuration taskManagerConfig = new Configuration();
-               taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, 
MockStateBackend.class.getName());
+               taskManagerConfig.setString(CoreOptions.STATE_BACKEND, 
MockStateBackend.class.getName());
 
                StreamConfig cfg = new StreamConfig(new Configuration());
                cfg.setStreamOperator(new StreamSource<>(new 
MockSourceFunction()));

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 073632a..875d0ed 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -105,7 +106,7 @@ public class RescalingITCase extends TestLogger {
                final File checkpointDir = temporaryFolder.newFolder();
                final File savepointDir = temporaryFolder.newFolder();
 
-               config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+               config.setString(CoreOptions.STATE_BACKEND, "filesystem");
                
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, 
checkpointDir.toURI().toString());
                config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, 
savepointDir.toURI().toString());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 77777d1..128522b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
@@ -168,7 +169,7 @@ public class SavepointITCase extends TestLogger {
                        LOG.info("Created temporary checkpoint directory: " + 
checkpointDir + ".");
                        LOG.info("Created temporary savepoint directory: " + 
savepointDir + ".");
 
-                       config.setString(ConfigConstants.STATE_BACKEND, 
"filesystem");
+                       config.setString(CoreOptions.STATE_BACKEND, 
"filesystem");
                        
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
                                checkpointDir.toURI().toString());
                        
config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
@@ -701,7 +702,7 @@ public class SavepointITCase extends TestLogger {
                        fail("Test setup failed: failed to create temporary 
directories.");
                }
 
-               config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+               config.setString(CoreOptions.STATE_BACKEND, "filesystem");
                
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
                                checkpointDir.toURI().toString());
                
config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 1a8a0a0..fced68c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -90,7 +91,7 @@ public class SavepointMigrationTestBase extends TestBaseUtils 
{
                LOG.info("Created temporary checkpoint directory: " + 
checkpointDir + ".");
                LOG.info("Created savepoint directory: " + savepointDir + ".");
 
-               config.setString(ConfigConstants.STATE_BACKEND, "memory");
+               config.setString(CoreOptions.STATE_BACKEND, "memory");
                
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, 
checkpointDir.toURI().toString());
                
config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
                config.setString("state.savepoints.dir", 
savepointDir.toURI().toString());

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index ca69e80..f25a302 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -97,7 +98,7 @@ public class ClassLoaderITCase extends TestLogger {
                parallelism = 4;
 
                // we need to use the "filesystem" state backend to ensure 
FLINK-2543 is not happening again.
-               config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+               config.setString(CoreOptions.STATE_BACKEND, "filesystem");
                
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
                                
FOLDER.newFolder().getAbsoluteFile().toURI().toString());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index d959e14..546e3d7 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -25,6 +25,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -118,7 +119,7 @@ public class YARNHighAvailabilityITCase extends 
YarnTestBase {
                
flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration());
                
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum="
 +
                        zkServer.getConnectString() + 
"@@yarn.application-attempts=" + numberApplicationAttempts +
-                       "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
+                       "@@" + CoreOptions.STATE_BACKEND + "=FILESYSTEM" +
                        "@@" + 
FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + 
fsStateHandlePath + "/checkpoints" +
                        "@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + 
"=" + fsStateHandlePath + "/recovery");
                flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + 
File.separator + "flink-conf.yaml"));

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 21599c1..edf57b3 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.SecurityOptions;
@@ -1232,8 +1233,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                // ------------------ Prepare Application Master Container  
------------------------------
 
                // respect custom JVM options in the YAML file
-               String javaOpts =
-                       
flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+               String javaOpts = 
flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
                //applicable only for YarnMiniCluster secure test run
                //krb5.conf file will be available as local resource in JM/TM 
container
                if (hasKrb5) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 70ccae8..ad3ebcd 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.hadoop.fs.Path;
@@ -203,7 +204,7 @@ public class YarnClusterDescriptorTest {
                                .getCommands().get(0));
 
                // logback + log4j, with/out krb5, different JVM opts
-               cfg.setString(ConfigConstants.FLINK_JVM_OPTIONS, jvmOpts);
+               cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
                assertEquals(
                        java + " " + jvmmem +
                                " " + jvmOpts +

Reply via email to