[FLINK-4770] [core] Introduce 'CoreOptions' The CoreOptions should hold all essential configuration values that are not specific to JobManager, TaskManager or any feature area, like HighAvailability or Security.
Examples for that are - default java options - default parallelism - default state backend Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4047965 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4047965 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4047965 Branch: refs/heads/master Commit: a4047965898adc0ba7bf74280a3b22792ced3399 Parents: 544f534 Author: Stephan Ewen <[email protected]> Authored: Fri Feb 17 14:57:20 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Mon Feb 20 21:29:16 2017 +0100 ---------------------------------------------------------------------- .../connectors/fs/RollingSinkSecuredITCase.java | 3 +- .../flink/configuration/ConfigConstants.java | 11 +++++- .../apache/flink/configuration/CoreOptions.java | 40 ++++++++++++++++++++ .../org/apache/flink/hdfstests/HDFSTest.java | 4 +- .../clusterframework/BootstrapTools.java | 3 +- .../flink/runtime/blob/BlobRecoveryITCase.java | 4 +- .../clusterframework/BootstrapToolsTest.java | 3 +- .../BlobLibraryCacheRecoveryITCase.java | 4 +- .../runtime/testutils/ZooKeeperTestUtils.java | 3 +- .../streaming/runtime/tasks/StreamTask.java | 6 +-- ...AlignedProcessingTimeWindowOperatorTest.java | 4 +- .../streaming/runtime/tasks/StreamTaskTest.java | 6 +-- .../test/checkpointing/RescalingITCase.java | 3 +- .../test/checkpointing/SavepointITCase.java | 5 ++- .../utils/SavepointMigrationTestBase.java | 3 +- .../test/classloading/ClassLoaderITCase.java | 3 +- .../flink/yarn/YARNHighAvailabilityITCase.java | 3 +- .../yarn/AbstractYarnClusterDescriptor.java | 4 +- .../flink/yarn/YarnClusterDescriptorTest.java | 3 +- 19 files changed, 86 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java index fa46fc7..768ca5e 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.fs; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.security.SecurityUtils; @@ -216,7 +217,7 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase { config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false); config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3); config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); - config.setString(ConfigConstants.STATE_BACKEND, "filesystem"); + config.setString(CoreOptions.STATE_BACKEND, "filesystem"); config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery"); config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints"); http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 44a78f9..5129f20 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -741,8 +741,11 @@ public final class ConfigConstants { // ----------------------------- Streaming -------------------------------- /** - * State backend for checkpoints; + * State backend for checkpoints + * + * @deprecated Use {@link CoreOptions#STATE_BACKEND} instead. */ + @Deprecated public static final String STATE_BACKEND = "state.backend"; // ----------------------------- Miscellaneous ---------------------------- @@ -756,7 +759,11 @@ public final class ConfigConstants { */ @Deprecated public static final String FLINK_BASE_DIR_PATH_KEY = "flink.base.dir.path"; - + + /** + * @deprecated Use {@link CoreOptions#FLINK_JVM_OPTIONS} instead. + */ + @Deprecated public static final String FLINK_JVM_OPTIONS = "env.java.opts"; // --------------------------- High Availability -------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java new file mode 100644 index 0000000..70e5f0b --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +@PublicEvolving +public class CoreOptions { + + /** + * + */ + public static final ConfigOption<String> FLINK_JVM_OPTIONS = ConfigOptions + .key("env.java.opts") + .defaultValue(""); + + public static final ConfigOption<Integer> DEFAULT_PARALLELISM_KEY = ConfigOptions + .key("parallelism.default") + .defaultValue(-1); + + public static final ConfigOption<String> STATE_BACKEND = ConfigOptions + .key("state.backend") + .noDefaultValue(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java index 49db0f8..75e666f 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java @@ -24,7 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironmentFactory; import org.apache.flink.api.java.LocalEnvironment; import org.apache.flink.api.java.io.AvroOutputFormat; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; @@ -230,7 +230,7 @@ public class HDFSTest { org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); - config.setString(ConfigConstants.STATE_BACKEND, "ZOOKEEPER"); + config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI); BlobRecoveryITCase.testBlobServerRecovery(config); http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index a0cf1d5..ebc9af8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -27,6 +27,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.util.LeaderRetrievalUtils; @@ -358,7 +359,7 @@ public class BootstrapTools { .put("jvmmem", "-Xms" + tmParams.taskManagerHeapSizeMB() + "m " + "-Xmx" + tmParams.taskManagerHeapSizeMB() + "m " + "-XX:MaxDirectMemorySize=" + tmParams.taskManagerDirectMemoryLimitMB() + "m"); - String javaOpts = flinkConfig.getString(ConfigConstants.FLINK_JVM_OPTIONS, ""); + String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS); //applicable only for YarnMiniCluster secure test run //krb5.conf file will be available as local resource in JM/TM container if(hasKrb5) { http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java index d043665..a7f792f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.blob; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; @@ -55,7 +55,7 @@ public class BlobRecoveryITCase { public void testBlobServerRecovery() throws Exception { Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); - config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); + config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath()); testBlobServerRecovery(config); http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java index b08e1f4..1d100da 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.junit.Test; import java.util.HashMap; @@ -209,7 +210,7 @@ public class BootstrapToolsTest { true, true, true, this.getClass())); // logback + log4j, with/out krb5, different JVM opts - cfg.setString(ConfigConstants.FLINK_JVM_OPTIONS, jvmOpts); + cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts); assertEquals( java + " " + jvmmem + " " + jvmOpts + http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index d3925be..7f75acc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.execution.librarycache; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobClient; @@ -65,7 +65,7 @@ public class BlobLibraryCacheRecoveryITCase { Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); - config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); + config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath()); try { http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java index 07cec32..42338cd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.testutils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; @@ -80,7 +81,7 @@ public class ZooKeeperTestUtils { config.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, connTimeout); // File system state backend - config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); + config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM"); config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery"); http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 6b33d12..d734dc9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -20,8 +20,8 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; @@ -694,7 +694,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } else { // see if we have a backend specified in the configuration Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration(); - String backendName = flinkConfig.getString(ConfigConstants.STATE_BACKEND, null); + String backendName = flinkConfig.getString(CoreOptions.STATE_BACKEND, null); if (backendName == null) { LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)"); @@ -731,7 +731,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName); } catch (ClassCastException e) { throw new IllegalConfigurationException("The class configured under '" + - ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" + + CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" + backendName + ')'); } catch (Throwable t) { throw new IllegalConfigurationException("Cannot create configured state backend", t); http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index 508d2e1..6f0e881 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -28,8 +28,8 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.state.StreamStateHandle; @@ -1074,7 +1074,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { private static StreamTask<?, ?> createMockTask() { Configuration configuration = new Configuration(); - configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager"); + configuration.setString(CoreOptions.STATE_BACKEND, "jobmanager"); StreamTask<?, ?> task = mock(StreamTask.class); when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>()); http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 6a63ee8..887ea4f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -25,8 +25,8 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobKey; @@ -192,7 +192,7 @@ public class StreamTaskTest extends TestLogger { @Test public void testStateBackendLoadingAndClosing() throws Exception { Configuration taskManagerConfig = new Configuration(); - taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName()); + taskManagerConfig.setString(CoreOptions.STATE_BACKEND, MockStateBackend.class.getName()); StreamConfig cfg = new StreamConfig(new Configuration()); cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction())); @@ -216,7 +216,7 @@ public class StreamTaskTest extends TestLogger { @Test public void testStateBackendClosingOnFailure() throws Exception { Configuration taskManagerConfig = new Configuration(); - taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName()); + taskManagerConfig.setString(CoreOptions.STATE_BACKEND, MockStateBackend.class.getName()); StreamConfig cfg = new StreamConfig(new Configuration()); cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction())); http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 073632a..875d0ed 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.instance.ActorGateway; @@ -105,7 +106,7 @@ public class RescalingITCase extends TestLogger { final File checkpointDir = temporaryFolder.newFolder(); final File savepointDir = temporaryFolder.newFolder(); - config.setString(ConfigConstants.STATE_BACKEND, "filesystem"); + config.setString(CoreOptions.STATE_BACKEND, "filesystem"); config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString()); config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString()); http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 77777d1..128522b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -34,6 +34,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.checkpoint.SubtaskState; @@ -168,7 +169,7 @@ public class SavepointITCase extends TestLogger { LOG.info("Created temporary checkpoint directory: " + checkpointDir + "."); LOG.info("Created temporary savepoint directory: " + savepointDir + "."); - config.setString(ConfigConstants.STATE_BACKEND, "filesystem"); + config.setString(CoreOptions.STATE_BACKEND, "filesystem"); config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString()); config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0"); @@ -701,7 +702,7 @@ public class SavepointITCase extends TestLogger { fail("Test setup failed: failed to create temporary directories."); } - config.setString(ConfigConstants.STATE_BACKEND, "filesystem"); + config.setString(CoreOptions.STATE_BACKEND, "filesystem"); config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString()); config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0"); http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java index 1a8a0a0..fced68c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java @@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -90,7 +91,7 @@ public class SavepointMigrationTestBase extends TestBaseUtils { LOG.info("Created temporary checkpoint directory: " + checkpointDir + "."); LOG.info("Created savepoint directory: " + savepointDir + "."); - config.setString(ConfigConstants.STATE_BACKEND, "memory"); + config.setString(CoreOptions.STATE_BACKEND, "memory"); config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString()); config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0"); config.setString("state.savepoints.dir", savepointDir.toURI().toString()); http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java index ca69e80..f25a302 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; @@ -97,7 +98,7 @@ public class ClassLoaderITCase extends TestLogger { parallelism = 4; // we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again. - config.setString(ConfigConstants.STATE_BACKEND, "filesystem"); + config.setString(CoreOptions.STATE_BACKEND, "filesystem"); config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, FOLDER.newFolder().getAbsoluteFile().toURI().toString()); http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index d959e14..546e3d7 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -25,6 +25,7 @@ import org.apache.curator.test.TestingServer; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.akka.AkkaUtils; @@ -118,7 +119,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration()); flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" + zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts + - "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" + + "@@" + CoreOptions.STATE_BACKEND + "=FILESYSTEM" + "@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" + "@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery"); flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 21599c1..edf57b3 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -21,6 +21,7 @@ package org.apache.flink.yarn; import org.apache.flink.client.CliFrontend; import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.SecurityOptions; @@ -1232,8 +1233,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor // ------------------ Prepare Application Master Container ------------------------------ // respect custom JVM options in the YAML file - String javaOpts = - flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, ""); + String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS); //applicable only for YarnMiniCluster secure test run //krb5.conf file will be available as local resource in JM/TM container if (hasKrb5) { http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java index 70ccae8..ad3ebcd 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.yarn; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.hadoop.fs.Path; @@ -203,7 +204,7 @@ public class YarnClusterDescriptorTest { .getCommands().get(0)); // logback + log4j, with/out krb5, different JVM opts - cfg.setString(ConfigConstants.FLINK_JVM_OPTIONS, jvmOpts); + cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts); assertEquals( java + " " + jvmmem + " " + jvmOpts +
