This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 38f7b51d0cc45293dc71ad31607ecc685b11498f Author: sxnan <suxuanna...@gmail.com> AuthorDate: Tue Jan 9 17:50:34 2024 +0800 [FLINK-34083][config] Deprecate string configuration keys and unused constants in ConfigConstants --- .../generated/all_taskmanager_section.html | 18 +++++++++ .../generated/task_manager_configuration.html | 18 +++++++++ .../generated/yarn_config_configuration.html | 6 +++ .../client/deployment/executors/LocalExecutor.java | 3 +- .../client/program/PerJobMiniClusterFactory.java | 6 +-- .../flink/api/common/io/FileInputFormat.java | 11 ++---- .../flink/configuration/ConfigConstants.java | 44 +++++++++++++++------- .../flink/configuration/TaskManagerOptions.java | 25 ++++++++++++ .../itcases/HAQueryableStateFsBackendITCase.java | 5 +-- .../HAQueryableStateRocksDBBackendITCase.java | 5 +-- .../NonHAQueryableStateFsBackendITCase.java | 3 +- .../NonHAQueryableStateRocksDBBackendITCase.java | 3 +- .../runtime/webmonitor/WebFrontendITCase.java | 4 +- .../taskexecutor/TaskManagerConfiguration.java | 4 +- .../runtime/taskexecutor/TaskExecutorTest.java | 4 +- .../table/client/gateway/ExecutorImplITCase.java | 2 +- .../utils/SnapshotMigrationTestBase.java | 3 +- .../src/main/java/org/apache/flink/yarn/Utils.java | 6 +-- .../apache/flink/yarn/YarnClusterDescriptor.java | 5 +-- .../yarn/configuration/YarnConfigOptions.java | 19 ++++++++++ .../test/java/org/apache/flink/yarn/UtilsTest.java | 9 +++-- .../flink/yarn/YarnClusterDescriptorTest.java | 9 +++-- 22 files changed, 147 insertions(+), 65 deletions(-) diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_section.html b/docs/layouts/shortcodes/generated/all_taskmanager_section.html index 0e7ba018327..fa6f069d80c 100644 --- a/docs/layouts/shortcodes/generated/all_taskmanager_section.html +++ b/docs/layouts/shortcodes/generated/all_taskmanager_section.html @@ -8,6 +8,12 @@ </tr> </thead> <tbody> + <tr> + <td><h5>minicluster.number-of-taskmanagers</h5></td> + <td style="word-wrap: break-word;">1</td> + <td>Integer</td> + <td>The number of task managers of MiniCluster.</td> + </tr> <tr> <td><h5>task.cancellation.interval</h5></td> <td style="word-wrap: break-word;">30000</td> @@ -86,6 +92,12 @@ <td><p>Enum</p></td> <td>Mode for the load-balance allocation strategy across all available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots evenly across all available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">NONE</code> mode is the default mode without any specified strategy.</li></ul><br /><br />Possible values:<ul><li>"NONE"</li><li>"SLOTS"</li [...] </tr> + <tr> + <td><h5>taskmanager.log.path</h5></td> + <td style="word-wrap: break-word;">System.getProperty("log.file")</td> + <td>String</td> + <td>The path to the log file of the task manager.</td> + </tr> <tr> <td><h5>taskmanager.memory.min-segment-size</h5></td> <td style="word-wrap: break-word;">256 bytes</td> @@ -135,6 +147,12 @@ <td>String</td> <td>The external RPC port where the TaskManager is exposed. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine.</td> </tr> + <tr> + <td><h5>taskmanager.runtime.fs-timeout</h5></td> + <td style="word-wrap: break-word;">0 ms</td> + <td>Duration</td> + <td>The timeout for filesystem stream opening. A value of 0 indicates infinite waiting.</td> + </tr> <tr> <td><h5>taskmanager.slot.timeout</h5></td> <td style="word-wrap: break-word;">10 s</td> diff --git a/docs/layouts/shortcodes/generated/task_manager_configuration.html b/docs/layouts/shortcodes/generated/task_manager_configuration.html index d9ba88f35c0..52616e1ec79 100644 --- a/docs/layouts/shortcodes/generated/task_manager_configuration.html +++ b/docs/layouts/shortcodes/generated/task_manager_configuration.html @@ -8,6 +8,12 @@ </tr> </thead> <tbody> + <tr> + <td><h5>minicluster.number-of-taskmanagers</h5></td> + <td style="word-wrap: break-word;">1</td> + <td>Integer</td> + <td>The number of task managers of MiniCluster.</td> + </tr> <tr> <td><h5>task.cancellation.interval</h5></td> <td style="word-wrap: break-word;">30000</td> @@ -68,6 +74,12 @@ <td><p>Enum</p></td> <td>Mode for the load-balance allocation strategy across all available <code class="highlighter-rouge">TaskManagers</code>.<ul><li>The <code class="highlighter-rouge">SLOTS</code> mode tries to spread out the slots evenly across all available <code class="highlighter-rouge">TaskManagers</code>.</li><li>The <code class="highlighter-rouge">NONE</code> mode is the default mode without any specified strategy.</li></ul><br /><br />Possible values:<ul><li>"NONE"</li><li>"SLOTS"</li [...] </tr> + <tr> + <td><h5>taskmanager.log.path</h5></td> + <td style="word-wrap: break-word;">System.getProperty("log.file")</td> + <td>String</td> + <td>The path to the log file of the task manager.</td> + </tr> <tr> <td><h5>taskmanager.network.bind-policy</h5></td> <td style="word-wrap: break-word;">"ip"</td> @@ -135,6 +147,12 @@ <td>String</td> <td>The external RPC port where the TaskManager is exposed. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine.</td> </tr> + <tr> + <td><h5>taskmanager.runtime.fs-timeout</h5></td> + <td style="word-wrap: break-word;">0 ms</td> + <td>Duration</td> + <td>The timeout for filesystem stream opening. A value of 0 indicates infinite waiting.</td> + </tr> <tr> <td><h5>taskmanager.slot.timeout</h5></td> <td style="word-wrap: break-word;">10 s</td> diff --git a/docs/layouts/shortcodes/generated/yarn_config_configuration.html b/docs/layouts/shortcodes/generated/yarn_config_configuration.html index b747cf93274..901d12b00e3 100644 --- a/docs/layouts/shortcodes/generated/yarn_config_configuration.html +++ b/docs/layouts/shortcodes/generated/yarn_config_configuration.html @@ -92,6 +92,12 @@ <td><p>Enum</p></td> <td>Defines whether user-jars are included in the system class path as well as their positioning in the path.<br /><br />Possible values:<ul><li>"DISABLED": Exclude user jars from the system class path</li><li>"FIRST": Position at the beginning</li><li>"LAST": Position at the end</li><li>"ORDER": Position based on the name of the jar</li></ul></td> </tr> + <tr> + <td><h5>yarn.container-start-command-template</h5></td> + <td style="word-wrap: break-word;">"%java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%"</td> + <td>String</td> + <td>This configuration parameter allows users to pass custom settings (such as JVM paths, arguments etc.) to start the YARN. The following placeholders will be replaced: <ul><li>%java%: Path to the Java executable</li><li>%jvmmem%: JVM memory limits and tweaks</li><li>%jvmopts%: Options for the Java VM</li><li>%logging%: Logging-related configuration settings</li><li>%class%: Main class to execute</li><li>%args%: Arguments for the main class</li><li>%redirects%: Output redire [...] + </tr> <tr> <td><h5>yarn.containers.vcores</h5></td> <td style="word-wrap: break-word;">-1</td> diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java index 45847a6a62d..898d2b733c9 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.Plan; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.client.program.PerJobMiniClusterFactory; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.TaskManagerOptions; @@ -97,7 +96,7 @@ public class LocalExecutor implements PipelineExecutor { configuration.getInteger( TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism()); final int numTaskManagers = - configuration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + configuration.get(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS); plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java index 7dc4585e58d..857946d47e5 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java @@ -18,7 +18,6 @@ package org.apache.flink.client.program; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.RestOptions; @@ -124,10 +123,7 @@ public final class PerJobMiniClusterFactory { configuration.setString(RestOptions.BIND_PORT, "0"); } - int numTaskManagers = - configuration.getInteger( - ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, - ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER); + int numTaskManagers = configuration.get(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS); Map<String, String> overwriteParallelisms = configuration.get(PipelineOptions.PARALLELISM_OVERRIDES); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index b8c57eb6615..c3fc58fedb0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -27,7 +27,6 @@ import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory; import org.apache.flink.api.common.io.compression.XZInputStreamFactory; import org.apache.flink.api.common.io.compression.ZStandardInputStreamFactory; import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.fs.BlockLocation; @@ -50,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.flink.configuration.TaskManagerOptions.FS_STREAM_OPENING_TIME_OUT; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -98,17 +98,14 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS * @param configuration The configuration to load defaults from */ private static void initDefaultsFromConfiguration(Configuration configuration) { - final long to = - configuration.getLong( - ConfigConstants.FS_STREAM_OPENING_TIMEOUT_KEY, - ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT); + final long to = configuration.get(FS_STREAM_OPENING_TIME_OUT).toMillis(); if (to < 0) { LOG.error( "Invalid timeout value for filesystem stream opening: " + to + ". Using default value of " - + ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT); - DEFAULT_OPENING_TIMEOUT = ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT; + + FS_STREAM_OPENING_TIME_OUT.defaultValue().toMillis()); + DEFAULT_OPENING_TIMEOUT = FS_STREAM_OPENING_TIME_OUT.defaultValue().toMillis(); } else if (to == 0) { DEFAULT_OPENING_TIMEOUT = 300000; // 5 minutes } else { diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 37e081e47cf..ad7857af06d 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -215,8 +215,12 @@ public final class ConfigConstants { */ @Deprecated public static final String TASK_MANAGER_TMP_DIR_KEY = "taskmanager.tmp.dirs"; - /** The config parameter defining the taskmanager log file location. */ - public static final String TASK_MANAGER_LOG_PATH_KEY = "taskmanager.log.path"; + /** + * The config parameter defining the taskmanager log file location. + * + * @deprecated Use {@link TaskManagerOptions#TASK_MANAGER_LOG_PATH} instead. + */ + @Deprecated public static final String TASK_MANAGER_LOG_PATH_KEY = "taskmanager.log.path"; /** @deprecated Use {@link TaskManagerOptions#MANAGED_MEMORY_SIZE} instead */ @Deprecated public static final String TASK_MANAGER_MEMORY_SIZE_KEY = "taskmanager.memory.size"; @@ -350,7 +354,10 @@ public final class ConfigConstants { /** * The config parameter defining the timeout for filesystem stream opening. A value of 0 * indicates infinite waiting. + * + * @deprecated use {@link TaskManagerOptions#FS_STREAM_OPENING_TIME_OUT} instead. */ + @Deprecated public static final String FS_STREAM_OPENING_TIMEOUT_KEY = "taskmanager.runtime.fs_timeout"; /** @@ -496,7 +503,12 @@ public final class ConfigConstants { */ @Deprecated public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env."; - /** Template for the YARN container start invocation. */ + /** + * Template for the YARN container start invocation. + * + * @deprecated in favor of {@code YarnConfigOptions#YARN_CONTAINER_START_COMMAND_TEMPLATE}. + */ + @Deprecated public static final String YARN_CONTAINER_START_COMMAND_TEMPLATE = "yarn.container-start-command-template"; @@ -929,7 +941,7 @@ public final class ConfigConstants { @PublicEvolving @Deprecated public static final String HA_MODE = "high-availability"; /** Ports used by the job manager if not in 'none' recovery mode. */ - @PublicEvolving + @Deprecated public static final String HA_JOB_MANAGER_PORT = "high-availability.jobmanager.port"; /** @deprecated Deprecated in favour of {@link #HA_MODE}. */ @@ -1203,7 +1215,7 @@ public final class ConfigConstants { @Deprecated public static final int DEFAULT_PARALLELISM = 1; /** The default number of execution retries. */ - public static final int DEFAULT_EXECUTION_RETRIES = 0; + @Deprecated public static final int DEFAULT_EXECUTION_RETRIES = 0; // ------------------------------ Runtime --------------------------------- @@ -1350,7 +1362,7 @@ public final class ConfigConstants { /** * The default timeout for filesystem stream opening: infinite (means max long milliseconds). */ - public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0; + @Deprecated public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0; /** * Whether to use the LargeRecordHandler when spilling. @@ -1416,10 +1428,10 @@ public final class ConfigConstants { * The default filesystem to be used, if no other scheme is specified in the user-provided URI * (= local filesystem). */ - public static final String DEFAULT_FILESYSTEM_SCHEME = "file:///"; + @Deprecated public static final String DEFAULT_FILESYSTEM_SCHEME = "file:///"; /** The default behavior with respect to overwriting existing files (= not overwrite). */ - public static final boolean DEFAULT_FILESYSTEM_OVERWRITE = false; + @Deprecated public static final boolean DEFAULT_FILESYSTEM_OVERWRITE = false; /** * The default behavior for output directory creating (create only directory when parallelism @@ -1585,18 +1597,22 @@ public final class ConfigConstants { // ----------------------------- Streaming Values -------------------------- - public static final String DEFAULT_STATE_BACKEND = "jobmanager"; + @Deprecated public static final String DEFAULT_STATE_BACKEND = "jobmanager"; // ----------------------------- LocalExecution ---------------------------- - /** Sets the number of local task managers. */ - public static final String LOCAL_NUMBER_TASK_MANAGER = "local.number-taskmanager"; + /** + * Sets the number of local task managers. + * + * @deprecated use {@link TaskManagerOptions#MINI_CLUSTER_NUM_TASK_MANAGERS} instead + */ + @Deprecated public static final String LOCAL_NUMBER_TASK_MANAGER = "local.number-taskmanager"; public static final int DEFAULT_LOCAL_NUMBER_TASK_MANAGER = 1; - public static final String LOCAL_NUMBER_JOB_MANAGER = "local.number-jobmanager"; + @Deprecated public static final String LOCAL_NUMBER_JOB_MANAGER = "local.number-jobmanager"; - public static final int DEFAULT_LOCAL_NUMBER_JOB_MANAGER = 1; + @Deprecated public static final int DEFAULT_LOCAL_NUMBER_JOB_MANAGER = 1; /** @deprecated Use {@link ResourceManagerOptions#LOCAL_NUMBER_RESOURCE_MANAGER} instead. */ @Deprecated @@ -1753,7 +1769,7 @@ public final class ConfigConstants { public static final String DEFAULT_FLINK_PLUGINS_DIRS = "plugins"; /** The environment variable name which contains the location of the bin directory. */ - public static final String ENV_FLINK_BIN_DIR = "FLINK_BIN_DIR"; + @Deprecated public static final String ENV_FLINK_BIN_DIR = "FLINK_BIN_DIR"; /** The environment variable name which contains the Flink installation root directory. */ public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME"; diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index 2c0a9801ac4..4c8f250c494 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -808,6 +808,31 @@ public class TaskManagerOptions { code(TaskManagerLoadBalanceMode.NONE.name()))) .build()); + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER) + @Documentation.OverrideDefault("System.getProperty(\"log.file\")") + public static final ConfigOption<String> TASK_MANAGER_LOG_PATH = + ConfigOptions.key("taskmanager.log.path") + .stringType() + .defaultValue(System.getProperty("log.file")) + .withDescription("The path to the log file of the task manager."); + + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER) + public static final ConfigOption<Duration> FS_STREAM_OPENING_TIME_OUT = + ConfigOptions.key("taskmanager.runtime.fs-timeout") + .durationType() + .defaultValue(Duration.ZERO) + .withDeprecatedKeys("taskmanager.runtime.fs_timeout") + .withDescription( + "The timeout for filesystem stream opening. A value of 0 indicates infinite waiting."); + + @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER) + public static final ConfigOption<Integer> MINI_CLUSTER_NUM_TASK_MANAGERS = + ConfigOptions.key("minicluster.number-of-taskmanagers") + .intType() + .defaultValue(1) + .withDeprecatedKeys("local.number-taskmanager") + .withDescription("The number of task managers of MiniCluster."); + /** Type of redirection of {@link System#out} and {@link System#err}. */ public enum SystemOutMode { diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java index 2465c6052cf..cdb0f33277e 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java @@ -19,7 +19,6 @@ package org.apache.flink.queryablestate.itcases; import org.apache.flink.client.program.rest.RestClusterClient; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.MemorySize; @@ -46,7 +45,6 @@ import java.nio.file.Path; /** Several integration tests for queryable state using the {@link FsStateBackend}. */ class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestBase { - private static final int NUM_JMS = 2; // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines so that // we always use all TaskManagers so that the JM oracle is always properly re-registered private static final int NUM_TMS = 2; @@ -104,8 +102,7 @@ class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestBase { Configuration config = new Configuration(); config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true); config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m")); - config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.set(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS, NUM_TMS); config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2); config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2); diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java index 3e7cc014997..b8599eb2836 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java @@ -19,7 +19,6 @@ package org.apache.flink.queryablestate.itcases; import org.apache.flink.client.program.rest.RestClusterClient; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.MemorySize; @@ -46,7 +45,6 @@ import java.nio.file.Path; /** Several integration tests for queryable state using the {@link RocksDBStateBackend}. */ class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableStateTestBase { - private static final int NUM_JMS = 2; // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines so that // we always use all TaskManagers so that the JM oracle is always properly re-registered private static final int NUM_TMS = 2; @@ -102,8 +100,7 @@ class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableStateTestBas Configuration config = new Configuration(); config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true); config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m")); - config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.set(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS, NUM_TMS); config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2); config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2); diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java index 26c7d366bf3..7e4a4f1059f 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java @@ -19,7 +19,6 @@ package org.apache.flink.queryablestate.itcases; import org.apache.flink.client.program.rest.RestClusterClient; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.QueryableStateOptions; @@ -85,7 +84,7 @@ public class NonHAQueryableStateFsBackendITCase extends AbstractQueryableStateTe Configuration config = new Configuration(); config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true); config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m")); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.set(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS, NUM_TMS); config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1); config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 1); diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java index 6e8e0e8be6f..22306bf5130 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java @@ -19,7 +19,6 @@ package org.apache.flink.queryablestate.itcases; import org.apache.flink.client.program.rest.RestClusterClient; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.QueryableStateOptions; @@ -84,7 +83,7 @@ public class NonHAQueryableStateRocksDBBackendITCase extends AbstractQueryableSt Configuration config = new Configuration(); config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true); config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m")); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.set(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS, NUM_TMS); config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1); config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 1); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java index b677a094d54..c190d4fc354 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java @@ -107,8 +107,8 @@ class WebFrontendITCase { Files.createFile(outFile); config.setString(WebOptions.LOG_PATH, logFile.toAbsolutePath().toString()); - config.setString( - ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toAbsolutePath().toString()); + config.set( + TaskManagerOptions.TASK_MANAGER_LOG_PATH, logFile.toAbsolutePath().toString()); } catch (Exception e) { throw new AssertionError("Could not setup test.", e); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java index ca7aaabe813..cb87b238586 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.configuration.AkkaOptions; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.TaskManagerOptions; @@ -214,8 +213,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY); final String taskManagerLogPath = - configuration.getString( - ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file")); + configuration.get(TaskManagerOptions.TASK_MANAGER_LOG_PATH); final String taskManagerStdoutPath; final String taskManagerLogDir; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index e266e3639c6..b413604d2dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.resources.CPUResource; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; @@ -165,6 +164,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import static java.util.stream.IntStream.range; +import static org.apache.flink.configuration.TaskManagerOptions.TASK_MANAGER_LOG_PATH; import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup; import static org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils.DEFAULT_RESOURCE_PROFILE; @@ -2230,7 +2230,7 @@ class TaskExecutorTest { configuration.setInteger( NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); - configuration.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist"); + configuration.set(TASK_MANAGER_LOG_PATH, "/i/dont/exist"); try (TaskSubmissionTestEnvironment env = new Builder(jobId) diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java index 14d4272fdf6..a1b0571ec0d 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java @@ -187,7 +187,7 @@ class ExecutorImplITCase { private static Configuration getConfig() { Configuration config = new Configuration(); config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m")); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.set(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS, NUM_TMS); config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); config.setBoolean(WebOptions.SUBMIT_ENABLE, false); config.set(StateBackendOptions.STATE_BACKEND, "hashmap"); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java index 59561e4a4ae..41f5756f5d2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SnapshotMigrationTestBase.java @@ -25,7 +25,6 @@ import org.apache.flink.api.common.time.Deadline; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HeartbeatManagerOptions; import org.apache.flink.configuration.MemorySize; @@ -233,7 +232,7 @@ public abstract class SnapshotMigrationTestBase extends TestLogger { // Flink configuration final Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + config.set(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS, 1); config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, DEFAULT_PARALLELISM); UUID id = UUID.randomUUID(); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 7a707cf65d9..621f1dcda41 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -73,6 +73,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; import static org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR; +import static org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE; /** Utility class that provides helper methods to work with Apache Hadoop YARN. */ public final class Utils { @@ -553,10 +554,7 @@ public final class Utils { } startCommandValues.put("args", argsStr); - final String commandTemplate = - flinkConfig.getString( - ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE, - ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE); + final String commandTemplate = flinkConfig.get(YARN_CONTAINER_START_COMMAND_TEMPLATE); String startCommand = getStartCommand(commandTemplate, startCommandValues); LOG.debug("TaskManager start command: " + startCommand); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index ca9cc90a599..6e1f14392f7 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -146,6 +146,7 @@ import static org.apache.flink.yarn.Utils.getPathFromLocalFilePathStr; import static org.apache.flink.yarn.Utils.getStartCommand; import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; import static org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR; +import static org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE; /** The descriptor with deployment information for deploying a Flink cluster on Yarn. */ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { @@ -1895,9 +1896,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { startCommandValues.put("args", dynamicParameterListStr); final String commandTemplate = - flinkConfiguration.getString( - ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE, - ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE); + flinkConfiguration.get(YARN_CONTAINER_START_COMMAND_TEMPLATE); final String amCommand = getStartCommand(commandTemplate, startCommandValues); amContainer.setCommands(Collections.singletonList(amCommand)); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java index 48f06099616..63d524d77bd 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java @@ -421,6 +421,25 @@ public class YarnConfigOptions { "yarn-default.xml")) .build()); + public static final ConfigOption<String> YARN_CONTAINER_START_COMMAND_TEMPLATE = + key("yarn.container-start-command-template") + .stringType() + .defaultValue("%java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%") + .withDescription( + Description.builder() + .text( + "This configuration parameter allows users to pass custom settings (such as JVM paths, arguments etc.) to start the YARN. The following placeholders will be replaced: ") + .list( + text("%java%: Path to the Java executable"), + text("%jvmmem%: JVM memory limits and tweaks"), + text("%jvmopts%: Options for the Java VM"), + text( + "%logging%: Logging-related configuration settings"), + text("%class%: Main class to execute"), + text("%args%: Arguments for the main class"), + text("%redirects%: Output redirects")) + .build()); + /** * Defines the configuration key of that external resource in Yarn. This is used as a suffix in * an actual config. diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java index 654b1b3bdde..d3f3a9e239e 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -44,6 +44,7 @@ import java.util.HashMap; import java.util.List; import java.util.stream.Stream; +import static org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -561,8 +562,8 @@ class UtilsTest { // now try some configurations with different yarn.container-start-command-template - cfg.setString( - ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE, + cfg.set( + YARN_CONTAINER_START_COMMAND_TEMPLATE, "%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%"); assertThat( Utils.getTaskManagerShellCommand( @@ -597,8 +598,8 @@ class UtilsTest { "6", redirects)); - cfg.setString( - ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE, + cfg.set( + YARN_CONTAINER_START_COMMAND_TEMPLATE, "%java% %logging% %jvmopts% %jvmmem% %class% %args% %redirects%"); assertThat( Utils.getTaskManagerShellCommand( diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java index 1b77be1ed71..5eadae781e0 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java @@ -70,6 +70,7 @@ import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; import static org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.createDefaultJobManagerProcessSpec; import static org.apache.flink.yarn.Utils.getPathFromLocalFile; import static org.apache.flink.yarn.configuration.YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR; +import static org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; @@ -472,8 +473,8 @@ class YarnClusterDescriptorTest { cfg.set( YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME); - cfg.setString( - ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE, + cfg.set( + YARN_CONTAINER_START_COMMAND_TEMPLATE, "%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%"); assertThat( clusterDescriptor @@ -505,8 +506,8 @@ class YarnClusterDescriptorTest { cfg.set( YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME); - cfg.setString( - ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE, + cfg.set( + YARN_CONTAINER_START_COMMAND_TEMPLATE, "%java% %logging% %jvmopts% %jvmmem% %class% %args% %redirects%"); // IMPORTANT: Be aware that we are using side effects here to modify the created // YarnClusterDescriptor