This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 38f7b51d0cc45293dc71ad31607ecc685b11498f
Author: sxnan <suxuanna...@gmail.com>
AuthorDate: Tue Jan 9 17:50:34 2024 +0800

    [FLINK-34083][config] Deprecate string configuration keys and unused 
constants in ConfigConstants
---
 .../generated/all_taskmanager_section.html         | 18 +++++++++
 .../generated/task_manager_configuration.html      | 18 +++++++++
 .../generated/yarn_config_configuration.html       |  6 +++
 .../client/deployment/executors/LocalExecutor.java |  3 +-
 .../client/program/PerJobMiniClusterFactory.java   |  6 +--
 .../flink/api/common/io/FileInputFormat.java       | 11 ++----
 .../flink/configuration/ConfigConstants.java       | 44 +++++++++++++++-------
 .../flink/configuration/TaskManagerOptions.java    | 25 ++++++++++++
 .../itcases/HAQueryableStateFsBackendITCase.java   |  5 +--
 .../HAQueryableStateRocksDBBackendITCase.java      |  5 +--
 .../NonHAQueryableStateFsBackendITCase.java        |  3 +-
 .../NonHAQueryableStateRocksDBBackendITCase.java   |  3 +-
 .../runtime/webmonitor/WebFrontendITCase.java      |  4 +-
 .../taskexecutor/TaskManagerConfiguration.java     |  4 +-
 .../runtime/taskexecutor/TaskExecutorTest.java     |  4 +-
 .../table/client/gateway/ExecutorImplITCase.java   |  2 +-
 .../utils/SnapshotMigrationTestBase.java           |  3 +-
 .../src/main/java/org/apache/flink/yarn/Utils.java |  6 +--
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  5 +--
 .../yarn/configuration/YarnConfigOptions.java      | 19 ++++++++++
 .../test/java/org/apache/flink/yarn/UtilsTest.java |  9 +++--
 .../flink/yarn/YarnClusterDescriptorTest.java      |  9 +++--
 22 files changed, 147 insertions(+), 65 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_section.html 
b/docs/layouts/shortcodes/generated/all_taskmanager_section.html
index 0e7ba018327..fa6f069d80c 100644
--- a/docs/layouts/shortcodes/generated/all_taskmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_taskmanager_section.html
@@ -8,6 +8,12 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>minicluster.number-of-taskmanagers</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>The number of task managers of MiniCluster.</td>
+        </tr>
         <tr>
             <td><h5>task.cancellation.interval</h5></td>
             <td style="word-wrap: break-word;">30000</td>
@@ -86,6 +92,12 @@
             <td><p>Enum</p></td>
             <td>Mode for the load-balance allocation strategy across all 
available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code 
class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots 
evenly across all available <code 
class="highlighter-rouge">TaskManagers</code>.</li><li>The <code 
class="highlighter-rouge">NONE</code> mode is the default mode without any 
specified strategy.</li></ul><br /><br />Possible 
values:<ul><li>"NONE"</li><li>"SLOTS"</li [...]
         </tr>
+        <tr>
+            <td><h5>taskmanager.log.path</h5></td>
+            <td style="word-wrap: 
break-word;">System.getProperty("log.file")</td>
+            <td>String</td>
+            <td>The path to the log file of the task manager.</td>
+        </tr>
         <tr>
             <td><h5>taskmanager.memory.min-segment-size</h5></td>
             <td style="word-wrap: break-word;">256 bytes</td>
@@ -135,6 +147,12 @@
             <td>String</td>
             <td>The external RPC port where the TaskManager is exposed. 
Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a 
combination of both. It is recommended to set a range of ports to avoid 
collisions when multiple TaskManagers are running on the same machine.</td>
         </tr>
+        <tr>
+            <td><h5>taskmanager.runtime.fs-timeout</h5></td>
+            <td style="word-wrap: break-word;">0 ms</td>
+            <td>Duration</td>
+            <td>The timeout for filesystem stream opening. A value of 0 
indicates infinite waiting.</td>
+        </tr>
         <tr>
             <td><h5>taskmanager.slot.timeout</h5></td>
             <td style="word-wrap: break-word;">10 s</td>
diff --git a/docs/layouts/shortcodes/generated/task_manager_configuration.html 
b/docs/layouts/shortcodes/generated/task_manager_configuration.html
index d9ba88f35c0..52616e1ec79 100644
--- a/docs/layouts/shortcodes/generated/task_manager_configuration.html
+++ b/docs/layouts/shortcodes/generated/task_manager_configuration.html
@@ -8,6 +8,12 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>minicluster.number-of-taskmanagers</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>The number of task managers of MiniCluster.</td>
+        </tr>
         <tr>
             <td><h5>task.cancellation.interval</h5></td>
             <td style="word-wrap: break-word;">30000</td>
@@ -68,6 +74,12 @@
             <td><p>Enum</p></td>
             <td>Mode for the load-balance allocation strategy across all 
available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code 
class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots 
evenly across all available <code 
class="highlighter-rouge">TaskManagers</code>.</li><li>The <code 
class="highlighter-rouge">NONE</code> mode is the default mode without any 
specified strategy.</li></ul><br /><br />Possible 
values:<ul><li>"NONE"</li><li>"SLOTS"</li [...]
         </tr>
+        <tr>
+            <td><h5>taskmanager.log.path</h5></td>
+            <td style="word-wrap: 
break-word;">System.getProperty("log.file")</td>
+            <td>String</td>
+            <td>The path to the log file of the task manager.</td>
+        </tr>
         <tr>
             <td><h5>taskmanager.network.bind-policy</h5></td>
             <td style="word-wrap: break-word;">"ip"</td>
@@ -135,6 +147,12 @@
             <td>String</td>
             <td>The external RPC port where the TaskManager is exposed. 
Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a 
combination of both. It is recommended to set a range of ports to avoid 
collisions when multiple TaskManagers are running on the same machine.</td>
         </tr>
+        <tr>
+            <td><h5>taskmanager.runtime.fs-timeout</h5></td>
+            <td style="word-wrap: break-word;">0 ms</td>
+            <td>Duration</td>
+            <td>The timeout for filesystem stream opening. A value of 0 
indicates infinite waiting.</td>
+        </tr>
         <tr>
             <td><h5>taskmanager.slot.timeout</h5></td>
             <td style="word-wrap: break-word;">10 s</td>
diff --git a/docs/layouts/shortcodes/generated/yarn_config_configuration.html 
b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
index b747cf93274..901d12b00e3 100644
--- a/docs/layouts/shortcodes/generated/yarn_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/yarn_config_configuration.html
@@ -92,6 +92,12 @@
             <td><p>Enum</p></td>
             <td>Defines whether user-jars are included in the system class 
path as well as their positioning in the path.<br /><br />Possible 
values:<ul><li>"DISABLED": Exclude user jars from the system class 
path</li><li>"FIRST": Position at the beginning</li><li>"LAST": Position at the 
end</li><li>"ORDER": Position based on the name of the jar</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>yarn.container-start-command-template</h5></td>
+            <td style="word-wrap: break-word;">"%java% %jvmmem% %jvmopts% 
%logging% %class% %args% %redirects%"</td>
+            <td>String</td>
+            <td>This configuration parameter allows users to pass custom 
settings (such as JVM paths, arguments etc.) to start the YARN. The following 
placeholders will be replaced: <ul><li>%java%: Path to the Java 
executable</li><li>%jvmmem%: JVM memory limits and tweaks</li><li>%jvmopts%: 
Options for the Java VM</li><li>%logging%: Logging-related configuration 
settings</li><li>%class%: Main class to execute</li><li>%args%: Arguments for 
the main class</li><li>%redirects%: Output redire [...]
+        </tr>
         <tr>
             <td><h5>yarn.containers.vcores</h5></td>
             <td style="word-wrap: break-word;">-1</td>
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
index 45847a6a62d..898d2b733c9 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.program.PerJobMiniClusterFactory;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -97,7 +96,7 @@ public class LocalExecutor implements PipelineExecutor {
                     configuration.getInteger(
                             TaskManagerOptions.NUM_TASK_SLOTS, 
plan.getMaximumParallelism());
             final int numTaskManagers =
-                    
configuration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+                    
configuration.get(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS);
 
             plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
         }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
index 7dc4585e58d..857946d47e5 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.client.program;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.RestOptions;
@@ -124,10 +123,7 @@ public final class PerJobMiniClusterFactory {
             configuration.setString(RestOptions.BIND_PORT, "0");
         }
 
-        int numTaskManagers =
-                configuration.getInteger(
-                        ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
-                        ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER);
+        int numTaskManagers = 
configuration.get(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS);
 
         Map<String, String> overwriteParallelisms =
                 configuration.get(PipelineOptions.PARALLELISM_OVERRIDES);
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index b8c57eb6615..c3fc58fedb0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
 import org.apache.flink.api.common.io.compression.XZInputStreamFactory;
 import org.apache.flink.api.common.io.compression.ZStandardInputStreamFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.BlockLocation;
@@ -50,6 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.flink.configuration.TaskManagerOptions.FS_STREAM_OPENING_TIME_OUT;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -98,17 +98,14 @@ public abstract class FileInputFormat<OT> extends 
RichInputFormat<OT, FileInputS
      * @param configuration The configuration to load defaults from
      */
     private static void initDefaultsFromConfiguration(Configuration 
configuration) {
-        final long to =
-                configuration.getLong(
-                        ConfigConstants.FS_STREAM_OPENING_TIMEOUT_KEY,
-                        ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT);
+        final long to = 
configuration.get(FS_STREAM_OPENING_TIME_OUT).toMillis();
         if (to < 0) {
             LOG.error(
                     "Invalid timeout value for filesystem stream opening: "
                             + to
                             + ". Using default value of "
-                            + 
ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT);
-            DEFAULT_OPENING_TIMEOUT = 
ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT;
+                            + 
FS_STREAM_OPENING_TIME_OUT.defaultValue().toMillis());
+            DEFAULT_OPENING_TIMEOUT = 
FS_STREAM_OPENING_TIME_OUT.defaultValue().toMillis();
         } else if (to == 0) {
             DEFAULT_OPENING_TIMEOUT = 300000; // 5 minutes
         } else {
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 37e081e47cf..ad7857af06d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -215,8 +215,12 @@ public final class ConfigConstants {
      */
     @Deprecated public static final String TASK_MANAGER_TMP_DIR_KEY = 
"taskmanager.tmp.dirs";
 
-    /** The config parameter defining the taskmanager log file location. */
-    public static final String TASK_MANAGER_LOG_PATH_KEY = 
"taskmanager.log.path";
+    /**
+     * The config parameter defining the taskmanager log file location.
+     *
+     * @deprecated Use {@link TaskManagerOptions#TASK_MANAGER_LOG_PATH} 
instead.
+     */
+    @Deprecated public static final String TASK_MANAGER_LOG_PATH_KEY = 
"taskmanager.log.path";
 
     /** @deprecated Use {@link TaskManagerOptions#MANAGED_MEMORY_SIZE} instead 
*/
     @Deprecated public static final String TASK_MANAGER_MEMORY_SIZE_KEY = 
"taskmanager.memory.size";
@@ -350,7 +354,10 @@ public final class ConfigConstants {
     /**
      * The config parameter defining the timeout for filesystem stream 
opening. A value of 0
      * indicates infinite waiting.
+     *
+     * @deprecated use {@link TaskManagerOptions#FS_STREAM_OPENING_TIME_OUT} 
instead.
      */
+    @Deprecated
     public static final String FS_STREAM_OPENING_TIMEOUT_KEY = 
"taskmanager.runtime.fs_timeout";
 
     /**
@@ -496,7 +503,12 @@ public final class ConfigConstants {
      */
     @Deprecated public static final String YARN_TASK_MANAGER_ENV_PREFIX = 
"yarn.taskmanager.env.";
 
-    /** Template for the YARN container start invocation. */
+    /**
+     * Template for the YARN container start invocation.
+     *
+     * @deprecated in favor of {@code 
YarnConfigOptions#YARN_CONTAINER_START_COMMAND_TEMPLATE}.
+     */
+    @Deprecated
     public static final String YARN_CONTAINER_START_COMMAND_TEMPLATE =
             "yarn.container-start-command-template";
 
@@ -929,7 +941,7 @@ public final class ConfigConstants {
     @PublicEvolving @Deprecated public static final String HA_MODE = 
"high-availability";
 
     /** Ports used by the job manager if not in 'none' recovery mode. */
-    @PublicEvolving
+    @Deprecated
     public static final String HA_JOB_MANAGER_PORT = 
"high-availability.jobmanager.port";
 
     /** @deprecated Deprecated in favour of {@link #HA_MODE}. */
@@ -1203,7 +1215,7 @@ public final class ConfigConstants {
     @Deprecated public static final int DEFAULT_PARALLELISM = 1;
 
     /** The default number of execution retries. */
-    public static final int DEFAULT_EXECUTION_RETRIES = 0;
+    @Deprecated public static final int DEFAULT_EXECUTION_RETRIES = 0;
 
     // ------------------------------ Runtime ---------------------------------
 
@@ -1350,7 +1362,7 @@ public final class ConfigConstants {
     /**
      * The default timeout for filesystem stream opening: infinite (means max 
long milliseconds).
      */
-    public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0;
+    @Deprecated public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0;
 
     /**
      * Whether to use the LargeRecordHandler when spilling.
@@ -1416,10 +1428,10 @@ public final class ConfigConstants {
      * The default filesystem to be used, if no other scheme is specified in 
the user-provided URI
      * (= local filesystem).
      */
-    public static final String DEFAULT_FILESYSTEM_SCHEME = "file:///";
+    @Deprecated public static final String DEFAULT_FILESYSTEM_SCHEME = 
"file:///";
 
     /** The default behavior with respect to overwriting existing files (= not 
overwrite). */
-    public static final boolean DEFAULT_FILESYSTEM_OVERWRITE = false;
+    @Deprecated public static final boolean DEFAULT_FILESYSTEM_OVERWRITE = 
false;
 
     /**
      * The default behavior for output directory creating (create only 
directory when parallelism
@@ -1585,18 +1597,22 @@ public final class ConfigConstants {
 
     // ----------------------------- Streaming Values 
--------------------------
 
-    public static final String DEFAULT_STATE_BACKEND = "jobmanager";
+    @Deprecated public static final String DEFAULT_STATE_BACKEND = 
"jobmanager";
 
     // ----------------------------- LocalExecution 
----------------------------
 
-    /** Sets the number of local task managers. */
-    public static final String LOCAL_NUMBER_TASK_MANAGER = 
"local.number-taskmanager";
+    /**
+     * Sets the number of local task managers.
+     *
+     * @deprecated use {@link 
TaskManagerOptions#MINI_CLUSTER_NUM_TASK_MANAGERS} instead
+     */
+    @Deprecated public static final String LOCAL_NUMBER_TASK_MANAGER = 
"local.number-taskmanager";
 
     public static final int DEFAULT_LOCAL_NUMBER_TASK_MANAGER = 1;
 
-    public static final String LOCAL_NUMBER_JOB_MANAGER = 
"local.number-jobmanager";
+    @Deprecated public static final String LOCAL_NUMBER_JOB_MANAGER = 
"local.number-jobmanager";
 
-    public static final int DEFAULT_LOCAL_NUMBER_JOB_MANAGER = 1;
+    @Deprecated public static final int DEFAULT_LOCAL_NUMBER_JOB_MANAGER = 1;
 
     /** @deprecated Use {@link 
ResourceManagerOptions#LOCAL_NUMBER_RESOURCE_MANAGER} instead. */
     @Deprecated
@@ -1753,7 +1769,7 @@ public final class ConfigConstants {
     public static final String DEFAULT_FLINK_PLUGINS_DIRS = "plugins";
 
     /** The environment variable name which contains the location of the bin 
directory. */
-    public static final String ENV_FLINK_BIN_DIR = "FLINK_BIN_DIR";
+    @Deprecated public static final String ENV_FLINK_BIN_DIR = "FLINK_BIN_DIR";
 
     /** The environment variable name which contains the Flink installation 
root directory. */
     public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME";
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 2c0a9801ac4..4c8f250c494 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -808,6 +808,31 @@ public class TaskManagerOptions {
                                                     
code(TaskManagerLoadBalanceMode.NONE.name())))
                                     .build());
 
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)
+    @Documentation.OverrideDefault("System.getProperty(\"log.file\")")
+    public static final ConfigOption<String> TASK_MANAGER_LOG_PATH =
+            ConfigOptions.key("taskmanager.log.path")
+                    .stringType()
+                    .defaultValue(System.getProperty("log.file"))
+                    .withDescription("The path to the log file of the task 
manager.");
+
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)
+    public static final ConfigOption<Duration> FS_STREAM_OPENING_TIME_OUT =
+            ConfigOptions.key("taskmanager.runtime.fs-timeout")
+                    .durationType()
+                    .defaultValue(Duration.ZERO)
+                    .withDeprecatedKeys("taskmanager.runtime.fs_timeout")
+                    .withDescription(
+                            "The timeout for filesystem stream opening. A 
value of 0 indicates infinite waiting.");
+
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)
+    public static final ConfigOption<Integer> MINI_CLUSTER_NUM_TASK_MANAGERS =
+            ConfigOptions.key("minicluster.number-of-taskmanagers")
+                    .intType()
+                    .defaultValue(1)
+                    .withDeprecatedKeys("local.number-taskmanager")
+                    .withDescription("The number of task managers of 
MiniCluster.");
+
     /** Type of redirection of {@link System#out} and {@link System#err}. */
     public enum SystemOutMode {
 
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
index 2465c6052cf..cdb0f33277e 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.queryablestate.itcases;
 
 import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.MemorySize;
@@ -46,7 +45,6 @@ import java.nio.file.Path;
 /** Several integration tests for queryable state using the {@link 
FsStateBackend}. */
 class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestBase {
 
-    private static final int NUM_JMS = 2;
     // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines 
so that
     // we always use all TaskManagers so that the JM oracle is always properly 
re-registered
     private static final int NUM_TMS = 2;
@@ -104,8 +102,7 @@ class HAQueryableStateFsBackendITCase extends 
AbstractQueryableStateTestBase {
         Configuration config = new Configuration();
         
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, 
true);
         config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
MemorySize.parse("4m"));
-        config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
-        config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+        config.set(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS, NUM_TMS);
         config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
         config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
         config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index 3e7cc014997..b8599eb2836 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.queryablestate.itcases;
 
 import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.MemorySize;
@@ -46,7 +45,6 @@ import java.nio.file.Path;
 /** Several integration tests for queryable state using the {@link 
RocksDBStateBackend}. */
 class HAQueryableStateRocksDBBackendITCase extends 
AbstractQueryableStateTestBase {
 
-    private static final int NUM_JMS = 2;
     // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines 
so that
     // we always use all TaskManagers so that the JM oracle is always properly 
re-registered
     private static final int NUM_TMS = 2;
@@ -102,8 +100,7 @@ class HAQueryableStateRocksDBBackendITCase extends 
AbstractQueryableStateTestBas
         Configuration config = new Configuration();
         
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, 
true);
         config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
MemorySize.parse("4m"));
-        config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
-        config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+        config.set(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS, NUM_TMS);
         config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
         config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
         config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
index 26c7d366bf3..7e4a4f1059f 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.queryablestate.itcases;
 
 import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.QueryableStateOptions;
@@ -85,7 +84,7 @@ public class NonHAQueryableStateFsBackendITCase extends 
AbstractQueryableStateTe
         Configuration config = new Configuration();
         
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, 
true);
         config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
MemorySize.parse("4m"));
-        config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+        config.set(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS, NUM_TMS);
         config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
         config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
         config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 1);
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index 6e8e0e8be6f..22306bf5130 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.queryablestate.itcases;
 
 import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.QueryableStateOptions;
@@ -84,7 +83,7 @@ public class NonHAQueryableStateRocksDBBackendITCase extends 
AbstractQueryableSt
         Configuration config = new Configuration();
         
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, 
true);
         config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
MemorySize.parse("4m"));
-        config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+        config.set(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS, NUM_TMS);
         config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
         config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
         config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 1);
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index b677a094d54..c190d4fc354 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -107,8 +107,8 @@ class WebFrontendITCase {
             Files.createFile(outFile);
 
             config.setString(WebOptions.LOG_PATH, 
logFile.toAbsolutePath().toString());
-            config.setString(
-                    ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
logFile.toAbsolutePath().toString());
+            config.set(
+                    TaskManagerOptions.TASK_MANAGER_LOG_PATH, 
logFile.toAbsolutePath().toString());
         } catch (Exception e) {
             throw new AssertionError("Could not setup test.", e);
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index ca7aaabe813..cb87b238586 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -214,8 +213,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                 
configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);
 
         final String taskManagerLogPath =
-                configuration.getString(
-                        ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
System.getProperty("log.file"));
+                configuration.get(TaskManagerOptions.TASK_MANAGER_LOG_PATH);
         final String taskManagerStdoutPath;
         final String taskManagerLogDir;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index e266e3639c6..b413604d2dd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.resources.CPUResource;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
@@ -165,6 +164,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.util.stream.IntStream.range;
+import static 
org.apache.flink.configuration.TaskManagerOptions.TASK_MANAGER_LOG_PATH;
 import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup;
 import static 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils.DEFAULT_RESOURCE_PROFILE;
@@ -2230,7 +2230,7 @@ class TaskExecutorTest {
         configuration.setInteger(
                 
NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
         
configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX,
 200);
-        configuration.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
"/i/dont/exist");
+        configuration.set(TASK_MANAGER_LOG_PATH, "/i/dont/exist");
 
         try (TaskSubmissionTestEnvironment env =
                 new Builder(jobId)
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
index 14d4272fdf6..a1b0571ec0d 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
@@ -187,7 +187,7 @@ class ExecutorImplITCase {
     private static Configuration getConfig() {
         Configuration config = new Configuration();
         config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
MemorySize.parse("4m"));
-        config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+        config.set(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS, NUM_TMS);
         config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
         config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
         config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
index 59561e4a4ae..41f5756f5d2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.MemorySize;
@@ -233,7 +232,7 @@ public abstract class SnapshotMigrationTestBase extends 
TestLogger {
         // Flink configuration
         final Configuration config = new Configuration();
 
-        config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+        config.set(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS, 1);
         config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
DEFAULT_PARALLELISM);
 
         UUID id = UUID.randomUUID();
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 7a707cf65d9..621f1dcda41 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -73,6 +73,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
 import static 
org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR;
+import static 
org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE;
 
 /** Utility class that provides helper methods to work with Apache Hadoop 
YARN. */
 public final class Utils {
@@ -553,10 +554,7 @@ public final class Utils {
         }
         startCommandValues.put("args", argsStr);
 
-        final String commandTemplate =
-                flinkConfig.getString(
-                        ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
-                        
ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
+        final String commandTemplate = 
flinkConfig.get(YARN_CONTAINER_START_COMMAND_TEMPLATE);
         String startCommand = getStartCommand(commandTemplate, 
startCommandValues);
         LOG.debug("TaskManager start command: " + startCommand);
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index ca9cc90a599..6e1f14392f7 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -146,6 +146,7 @@ import static 
org.apache.flink.yarn.Utils.getPathFromLocalFilePathStr;
 import static org.apache.flink.yarn.Utils.getStartCommand;
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
 import static 
org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR;
+import static 
org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE;
 
 /** The descriptor with deployment information for deploying a Flink cluster 
on Yarn. */
 public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> 
{
@@ -1895,9 +1896,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         startCommandValues.put("args", dynamicParameterListStr);
 
         final String commandTemplate =
-                flinkConfiguration.getString(
-                        ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
-                        
ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
+                flinkConfiguration.get(YARN_CONTAINER_START_COMMAND_TEMPLATE);
         final String amCommand = getStartCommand(commandTemplate, 
startCommandValues);
 
         amContainer.setCommands(Collections.singletonList(amCommand));
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 48f06099616..63d524d77bd 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -421,6 +421,25 @@ public class YarnConfigOptions {
                                                     "yarn-default.xml"))
                                     .build());
 
+    public static final ConfigOption<String> 
YARN_CONTAINER_START_COMMAND_TEMPLATE =
+            key("yarn.container-start-command-template")
+                    .stringType()
+                    .defaultValue("%java% %jvmmem% %jvmopts% %logging% %class% 
%args% %redirects%")
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "This configuration parameter 
allows users to pass custom settings (such as JVM paths, arguments etc.) to 
start the YARN. The following placeholders will be replaced: ")
+                                    .list(
+                                            text("%java%: Path to the Java 
executable"),
+                                            text("%jvmmem%: JVM memory limits 
and tweaks"),
+                                            text("%jvmopts%: Options for the 
Java VM"),
+                                            text(
+                                                    "%logging%: 
Logging-related configuration settings"),
+                                            text("%class%: Main class to 
execute"),
+                                            text("%args%: Arguments for the 
main class"),
+                                            text("%redirects%: Output 
redirects"))
+                                    .build());
+
     /**
      * Defines the configuration key of that external resource in Yarn. This 
is used as a suffix in
      * an actual config.
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 654b1b3bdde..d3f3a9e239e 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -44,6 +44,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -561,8 +562,8 @@ class UtilsTest {
 
         // now try some configurations with different 
yarn.container-start-command-template
 
-        cfg.setString(
-                ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+        cfg.set(
+                YARN_CONTAINER_START_COMMAND_TEMPLATE,
                 "%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 
6 %redirects%");
         assertThat(
                         Utils.getTaskManagerShellCommand(
@@ -597,8 +598,8 @@ class UtilsTest {
                                 "6",
                                 redirects));
 
-        cfg.setString(
-                ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+        cfg.set(
+                YARN_CONTAINER_START_COMMAND_TEMPLATE,
                 "%java% %logging% %jvmopts% %jvmmem% %class% %args% 
%redirects%");
         assertThat(
                         Utils.getTaskManagerShellCommand(
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 1b77be1ed71..5eadae781e0 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -70,6 +70,7 @@ import static 
org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
 import static 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.createDefaultJobManagerProcessSpec;
 import static org.apache.flink.yarn.Utils.getPathFromLocalFile;
 import static 
org.apache.flink.yarn.configuration.YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR;
+import static 
org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assertions.fail;
@@ -472,8 +473,8 @@ class YarnClusterDescriptorTest {
             cfg.set(
                     YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE,
                     YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME);
-            cfg.setString(
-                    ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+            cfg.set(
+                    YARN_CONTAINER_START_COMMAND_TEMPLATE,
                     "%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 
%args% 6 %redirects%");
             assertThat(
                             clusterDescriptor
@@ -505,8 +506,8 @@ class YarnClusterDescriptorTest {
             cfg.set(
                     YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE,
                     YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME);
-            cfg.setString(
-                    ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+            cfg.set(
+                    YARN_CONTAINER_START_COMMAND_TEMPLATE,
                     "%java% %logging% %jvmopts% %jvmmem% %class% %args% 
%redirects%");
             // IMPORTANT: Be aware that we are using side effects here to 
modify the created
             // YarnClusterDescriptor


Reply via email to