This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c148b62166d8fec9e7e525a836de890c2f12973b
Author: JunRuiLee <jrlee....@gmail.com>
AuthorDate: Fri Jan 12 21:56:15 2024 +0800

    [FLINK-33721][core] Extend BashJavaUtils to support reading and writing 
standard yaml file.
    
    This closes #24091.
---
 .../src/main/flink-bin/bin/bash-java-utils.sh      | 170 +++++++++++++++++
 .../flink-bin/bin/config-parser-utils.sh}          |  32 ++--
 flink-dist/src/main/flink-bin/bin/config.sh        | 143 +--------------
 flink-dist/src/test/bin/runBashJavaUtilsCmd.sh     |   2 +-
 .../src/test/bin/runExtractLoggingOutputs.sh       |   2 +-
 .../org/apache/flink/dist/BashJavaUtilsITCase.java |  94 ++++++++++
 .../entrypoint/ModifiableClusterConfiguration.java |  82 +++++++++
 ...odifiableClusterConfigurationParserFactory.java |  98 ++++++++++
 .../parser/ConfigurationCommandLineOptions.java    |  64 +++++++
 .../runtime/util/ConfigurationParserUtils.java     |  68 +++++++
 .../flink/runtime/util/bash/BashJavaUtils.java     |  12 +-
 .../flink/runtime/util/bash/FlinkConfigLoader.java |  18 +-
 ...iableClusterConfigurationParserFactoryTest.java |  96 ++++++++++
 .../runtime/util/bash/FlinkConfigLoaderTest.java   | 202 +++++++++++++++++++++
 14 files changed, 920 insertions(+), 163 deletions(-)

diff --git a/flink-dist/src/main/flink-bin/bin/bash-java-utils.sh 
b/flink-dist/src/main/flink-bin/bin/bash-java-utils.sh
new file mode 100755
index 00000000000..f4d02327197
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/bash-java-utils.sh
@@ -0,0 +1,170 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+UNAME=$(uname -s)
+if [ "${UNAME:0:6}" == "CYGWIN" ]; then
+    JAVA_RUN=java
+else
+    if [[ -d "$JAVA_HOME" ]]; then
+        JAVA_RUN="$JAVA_HOME"/bin/java
+    else
+        JAVA_RUN=java
+    fi
+fi
+
+manglePathList() {
+    UNAME=$(uname -s)
+    # a path list, for example a java classpath
+    if [ "${UNAME:0:6}" == "CYGWIN" ]; then
+        echo `cygpath -wp "$1"`
+    else
+        echo $1
+    fi
+}
+
+findFlinkDistJar() {
+    local FLINK_DIST
+    local LIB_DIR
+    if [[ -n "$1" ]]; then
+       LIB_DIR="$1"
+    else
+       LIB_DIR="$FLINK_LIB_DIR"
+    fi
+    FLINK_DIST="$(find "$LIB_DIR" -name 'flink-dist*.jar')"
+    local FLINK_DIST_COUNT
+    FLINK_DIST_COUNT="$(echo "$FLINK_DIST" | wc -l)"
+
+    # If flink-dist*.jar cannot be resolved write error messages to stderr 
since stdout is stored
+    # as the classpath and exit function with empty classpath to force process 
failure
+    if [[ "$FLINK_DIST" == "" ]]; then
+        (>&2 echo "[ERROR] Flink distribution jar not found in 
$FLINK_LIB_DIR.")
+        exit 1
+    elif [[ "$FLINK_DIST_COUNT" -gt 1 ]]; then
+        (>&2 echo "[ERROR] Multiple flink-dist*.jar found in $FLINK_LIB_DIR. 
Please resolve.")
+        exit 1
+    fi
+
+    echo "$FLINK_DIST"
+}
+
+runBashJavaUtilsCmd() {
+    local cmd=$1
+    local conf_dir=$2
+    local class_path=$3
+    local dynamic_args=${@:4}
+    class_path=`manglePathList "${class_path}"`
+
+    local output=`"${JAVA_RUN}" -classpath "${class_path}" 
org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir 
"${conf_dir}" $dynamic_args 2>&1 | tail -n 1000`
+    if [[ $? -ne 0 ]]; then
+        echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2
+        # Print the output in case the user redirect the log to console.
+        echo "$output" 1>&2
+        exit 1
+    fi
+
+    echo "$output"
+}
+
+updateAndGetFlinkConfiguration() {
+    local FLINK_CONF_DIR="$1"
+    local FLINK_BIN_DIR="$2"
+    local FLINK_LIB_DIR="$3"
+    local command_result
+    command_result=$(parseConfigurationAndExportLogs "$FLINK_CONF_DIR" 
"$FLINK_BIN_DIR" "$FLINK_LIB_DIR" "UPDATE_AND_GET_FLINK_CONFIGURATION" "${@:4}")
+    echo "$command_result"
+}
+
+parseConfigurationAndExportLogs() {
+    local FLINK_CONF_DIR="$1"
+    local FLINK_BIN_DIR="$2"
+    local FLINK_LIB_DIR="$3"
+    local COMMAND="$4"
+    local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"
+
+    java_utils_output=$(runBashJavaUtilsCmd "${COMMAND}" "${FLINK_CONF_DIR}" 
"${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar ${FLINK_LIB_DIR})" 
"${@:5}")
+    logging_output=$(extractLoggingOutputs "${java_utils_output}")
+    execution_results=$(echo "${java_utils_output}" | grep ${EXECUTION_PREFIX})
+
+    if [[ $? -ne 0 ]]; then
+      echo "[ERROR] Could not parse configurations properly."
+      echo "[ERROR] Raw output from BashJavaUtils:"
+      echo "$java_utils_output"
+      exit 1
+    fi
+
+    echo "${execution_results//${EXECUTION_PREFIX}/}"
+}
+
+extractLoggingOutputs() {
+    local output="$1"
+    local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"
+
+    echo "${output}" | grep -v ${EXECUTION_PREFIX}
+}
+
+extractExecutionResults() {
+    local output="$1"
+    local expected_lines="$2"
+    local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"
+    local execution_results
+    local num_lines
+
+    execution_results=$(echo "${output}" | grep ${EXECUTION_PREFIX})
+    num_lines=$(echo "${execution_results}" | wc -l)
+    # explicit check for empty result, because if execution_results is empty, 
then wc returns 1
+    if [[ -z ${execution_results} ]]; then
+        echo "[ERROR] The execution result is empty." 1>&2
+        exit 1
+    fi
+    if [[ ${num_lines} -ne ${expected_lines} ]]; then
+        echo "[ERROR] The execution results has unexpected number of lines, 
expected: ${expected_lines}, actual: ${num_lines}." 1>&2
+        echo "[ERROR] An execution result line is expected following the 
prefix '${EXECUTION_PREFIX}'" 1>&2
+        echo "$output" 1>&2
+        exit 1
+    fi
+
+    echo "${execution_results//${EXECUTION_PREFIX}/}"
+}
+
+parseResourceParamsAndExportLogs() {
+  local cmd=$1
+  java_utils_output=$(runBashJavaUtilsCmd ${cmd} "${FLINK_CONF_DIR}" 
"${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar)" "${@:2}")
+  logging_output=$(extractLoggingOutputs "${java_utils_output}")
+  params_output=$(extractExecutionResults "${java_utils_output}" 2)
+
+  if [[ $? -ne 0 ]]; then
+    echo "[ERROR] Could not get JVM parameters and dynamic configurations 
properly."
+    echo "[ERROR] Raw output from BashJavaUtils:"
+    echo "$java_utils_output"
+    exit 1
+  fi
+
+  jvm_params=$(echo "${params_output}" | head -n1)
+  export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
+  export DYNAMIC_PARAMETERS=$(IFS=" " echo "${params_output}" | tail -n1)
+
+  export FLINK_INHERITED_LOGS="
+$FLINK_INHERITED_LOGS
+
+RESOURCE_PARAMS extraction logs:
+jvm_params: $jvm_params
+dynamic_configs: $DYNAMIC_PARAMETERS
+logs: $logging_output
+"
+}
diff --git a/flink-dist/src/test/bin/runExtractLoggingOutputs.sh 
b/flink-dist/src/main/flink-bin/bin/config-parser-utils.sh
similarity index 65%
copy from flink-dist/src/test/bin/runExtractLoggingOutputs.sh
copy to flink-dist/src/main/flink-bin/bin/config-parser-utils.sh
index d8929012731..cfed6db8618 100755
--- a/flink-dist/src/test/bin/runExtractLoggingOutputs.sh
+++ b/flink-dist/src/main/flink-bin/bin/config-parser-utils.sh
@@ -17,22 +17,28 @@
 # limitations under the License.
 
################################################################################
 
-USAGE="Usage: runExtractLoggingOutputs.sh <input>"
+USAGE="Usage: config-parser-utils.sh FLINK_CONF_DIR FLINK_BIN_DIR 
FLINK_LIB_DIR [dynamic args...]"
 
-INPUT="$1"
-
-if [[ -z "${INPUT}" ]]; then
-  echo "$USAGE"
-  exit 1
+if [ "$#" -lt 3 ]; then
+    echo "$USAGE"
+    exit 1
 fi
 
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
+source "$2"/bash-java-utils.sh
+
+ARGS=("${@:1}")
+result=$(updateAndGetFlinkConfiguration "${ARGS[@]}")
 
-FLINK_CONF_DIR=${bin}/../../main/resources
-FLINK_TARGET_DIR=${bin}/../../../target
-FLINK_DIST_JAR=`find $FLINK_TARGET_DIR -name 'flink-dist*.jar'`
+if [[ $? -ne 0 ]]; then
+  echo "[ERROR] Could not get configurations properly, the result is :"
+  echo "$result"
+  exit 1
+fi
 
-. ${bin}/../../main/flink-bin/bin/config.sh > /dev/null
+CONF_FILE="$1/flink-conf.yaml"
+if [ ! -e "$1/flink-conf.yaml" ]; then
+  CONF_FILE="$1/config.yaml"
+fi;
 
-extractLoggingOutputs "${INPUT}"
+# Output the result
+echo "${result}" > "$CONF_FILE";
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index c798bdf4c01..d90e4362f7e 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -47,25 +47,6 @@ constructFlinkClassPath() {
     echo "$FLINK_CLASSPATH""$FLINK_DIST"
 }
 
-findFlinkDistJar() {
-    local FLINK_DIST
-    FLINK_DIST="$(find "$FLINK_LIB_DIR" -name 'flink-dist*.jar')"
-    local FLINK_DIST_COUNT
-    FLINK_DIST_COUNT="$(echo "$FLINK_DIST" | wc -l)"
-
-    # If flink-dist*.jar cannot be resolved write error messages to stderr 
since stdout is stored
-    # as the classpath and exit function with empty classpath to force process 
failure
-    if [[ "$FLINK_DIST" == "" ]]; then
-        (>&2 echo "[ERROR] Flink distribution jar not found in 
$FLINK_LIB_DIR.")
-        exit 1
-    elif [[ "$FLINK_DIST_COUNT" -gt 1 ]]; then
-        (>&2 echo "[ERROR] Multiple flink-dist*.jar found in $FLINK_LIB_DIR. 
Please resolve.")
-        exit 1
-    fi
-
-    echo "$FLINK_DIST"
-}
-
 findSqlGatewayJar() {
     local SQL_GATEWAY
     SQL_GATEWAY="$(find "$FLINK_OPT_DIR" -name 'flink-sql-gateway*.jar')"
@@ -116,16 +97,6 @@ manglePath() {
     fi
 }
 
-manglePathList() {
-    UNAME=$(uname -s)
-    # a path list, for example a java classpath
-    if [ "${UNAME:0:6}" == "CYGWIN" ]; then
-        echo `cygpath -wp "$1"`
-    else
-        echo $1
-    fi
-}
-
 # Looks up a config value by key from a simple YAML-style key-value map.
 # $1: key to look up
 # $2: default value to return if key does not exist
@@ -133,11 +104,9 @@ manglePathList() {
 readFromConfig() {
     local key=$1
     local defaultValue=$2
-    local configFile=$3
+    local configuration=$3
 
-    # first extract the value with the given key (1st sed), then trim the 
result (2nd sed)
-    # if a key exists multiple times, take the "last" one (tail)
-    local value=`sed -n "s/^[ ]*${key}[ ]*: \([^#]*\).*$/\1/p" "${configFile}" 
| sed "s/^ *//;s/ *$//" | tail -n 1`
+    local value=$(echo "$configuration" | grep "^[ ]*${key}[ ]*:" | cut -d ':' 
-f2- | sed "s/^ *//;s/ *$//" | tail -n 1)
 
     [ -z "$value" ] && echo "$defaultValue" || echo "$value"
 }
@@ -147,7 +116,6 @@ readFromConfig() {
 # -or- the respective environment variables are not set.
 
########################################################################################################################
 
-
 # WARNING !!! , these values are only used if there is nothing else is 
specified in
 # conf/flink-conf.yaml
 
@@ -234,8 +202,6 @@ FLINK_HOME_DIR_MANGLED=`manglePath "$FLINK_HOME"`
 if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_HOME_DIR_MANGLED/conf; 
fi
 FLINK_BIN_DIR=$FLINK_HOME_DIR_MANGLED/bin
 DEFAULT_FLINK_LOG_DIR=$FLINK_HOME_DIR_MANGLED/log
-FLINK_CONF_FILE="flink-conf.yaml"
-YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE}
 
 ### Exported environment variables ###
 export FLINK_CONF_DIR
@@ -246,36 +212,13 @@ export FLINK_LIB_DIR
 # export /opt dir to access it for the SQL client
 export FLINK_OPT_DIR
 
+source "${FLINK_BIN_DIR}/bash-java-utils.sh"
+YAML_CONF=$(updateAndGetFlinkConfiguration "${FLINK_CONF_DIR}" 
"${FLINK_BIN_DIR}" ${FLINK_LIB_DIR} -flatten)
+
 
########################################################################################################################
 # ENVIRONMENT VARIABLES
 
########################################################################################################################
 
-# read JAVA_HOME from config with no default value
-MY_JAVA_HOME=$(readFromConfig ${KEY_ENV_JAVA_HOME} "" "${YAML_CONF}")
-# check if config specified JAVA_HOME
-if [ -z "${MY_JAVA_HOME}" ]; then
-    # config did not specify JAVA_HOME. Use system JAVA_HOME
-    MY_JAVA_HOME="${JAVA_HOME}"
-fi
-# check if we have a valid JAVA_HOME and if java is not available
-if [ -z "${MY_JAVA_HOME}" ] && ! type java > /dev/null 2> /dev/null; then
-    echo "Please specify JAVA_HOME. Either in Flink config 
./conf/flink-conf.yaml or as system-wide JAVA_HOME."
-    exit 1
-else
-    JAVA_HOME="${MY_JAVA_HOME}"
-fi
-
-UNAME=$(uname -s)
-if [ "${UNAME:0:6}" == "CYGWIN" ]; then
-    JAVA_RUN=java
-else
-    if [[ -d "$JAVA_HOME" ]]; then
-        JAVA_RUN="$JAVA_HOME"/bin/java
-    else
-        JAVA_RUN=java
-    fi
-fi
-
 # Define HOSTNAME if it is not already set
 if [ -z "${HOSTNAME}" ]; then
     HOSTNAME=`hostname`
@@ -561,82 +504,6 @@ TMWorkers() {
     fi
 }
 
-runBashJavaUtilsCmd() {
-    local cmd=$1
-    local conf_dir=$2
-    local class_path=$3
-    local dynamic_args=${@:4}
-    class_path=`manglePathList "${class_path}"`
-
-    local output=`"${JAVA_RUN}" -classpath "${class_path}" 
org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir 
"${conf_dir}" $dynamic_args 2>&1 | tail -n 1000`
-    if [[ $? -ne 0 ]]; then
-        echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2
-        # Print the output in case the user redirect the log to console.
-        echo "$output" 1>&2
-        exit 1
-    fi
-
-    echo "$output"
-}
-
-extractExecutionResults() {
-    local output="$1"
-    local expected_lines="$2"
-    local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"
-    local execution_results
-    local num_lines
-
-    execution_results=$(echo "${output}" | grep ${EXECUTION_PREFIX})
-    num_lines=$(echo "${execution_results}" | wc -l)
-    # explicit check for empty result, because if execution_results is empty, 
then wc returns 1
-    if [[ -z ${execution_results} ]]; then
-        echo "[ERROR] The execution result is empty." 1>&2
-        exit 1
-    fi
-    if [[ ${num_lines} -ne ${expected_lines} ]]; then
-        echo "[ERROR] The execution results has unexpected number of lines, 
expected: ${expected_lines}, actual: ${num_lines}." 1>&2
-        echo "[ERROR] An execution result line is expected following the 
prefix '${EXECUTION_PREFIX}'" 1>&2
-        echo "$output" 1>&2
-        exit 1
-    fi
-
-    echo "${execution_results//${EXECUTION_PREFIX}/}"
-}
-
-extractLoggingOutputs() {
-    local output="$1"
-    local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"
-
-    echo "${output}" | grep -v ${EXECUTION_PREFIX}
-}
-
-parseResourceParamsAndExportLogs() {
-  local cmd=$1
-  java_utils_output=$(runBashJavaUtilsCmd ${cmd} "${FLINK_CONF_DIR}" 
"${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar)" "${@:2}")
-  logging_output=$(extractLoggingOutputs "${java_utils_output}")
-  params_output=$(extractExecutionResults "${java_utils_output}" 2)
-
-  if [[ $? -ne 0 ]]; then
-    echo "[ERROR] Could not get JVM parameters and dynamic configurations 
properly."
-    echo "[ERROR] Raw output from BashJavaUtils:"
-    echo "$java_utils_output"
-    exit 1
-  fi
-
-  jvm_params=$(echo "${params_output}" | head -n1)
-  export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
-  export DYNAMIC_PARAMETERS=$(IFS=" " echo "${params_output}" | tail -n1)
-
-  export FLINK_INHERITED_LOGS="
-$FLINK_INHERITED_LOGS
-
-RESOURCE_PARAMS extraction logs:
-jvm_params: $jvm_params
-dynamic_configs: $DYNAMIC_PARAMETERS
-logs: $logging_output
-"
-}
-
 parseJmArgsAndExportLogs() {
   parseResourceParamsAndExportLogs GET_JM_RESOURCE_PARAMS "$@"
 }
diff --git a/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh 
b/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh
index 41efe8ad1ee..e0340146b87 100755
--- a/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh
+++ b/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh
@@ -37,7 +37,7 @@ FLINK_TARGET_DIR=${bin}/../../../target
 FLINK_DIST_JARS=(`find ${FLINK_TARGET_DIR} -maxdepth 1 -name 
'flink-dist*.jar'`)
 FLINK_DIST_CLASSPATH=`echo ${FLINK_DIST_JARS[@]} | tr ' ' ':'`
 
-. ${bin}/../../main/flink-bin/bin/config.sh > /dev/null
+. ${bin}/../../main/flink-bin/bin/bash-java-utils.sh > /dev/null
 
 output=$(runBashJavaUtilsCmd ${COMMAND} ${FLINK_CONF_DIR} 
"$FLINK_TARGET_DIR/bash-java-utils.jar:${FLINK_DIST_CLASSPATH}" $DYNAMIC_OPTS)
 extractExecutionResults "${output}" ${EXPECTED_LINES}
diff --git a/flink-dist/src/test/bin/runExtractLoggingOutputs.sh 
b/flink-dist/src/test/bin/runExtractLoggingOutputs.sh
index d8929012731..cd04666afa5 100755
--- a/flink-dist/src/test/bin/runExtractLoggingOutputs.sh
+++ b/flink-dist/src/test/bin/runExtractLoggingOutputs.sh
@@ -33,6 +33,6 @@ FLINK_CONF_DIR=${bin}/../../main/resources
 FLINK_TARGET_DIR=${bin}/../../../target
 FLINK_DIST_JAR=`find $FLINK_TARGET_DIR -name 'flink-dist*.jar'`
 
-. ${bin}/../../main/flink-bin/bin/config.sh > /dev/null
+. ${bin}/../../main/flink-bin/bin/bash-java-utils.sh > /dev/null
 
 extractLoggingOutputs "${INPUT}"
diff --git 
a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java 
b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java
index e9c0e1cf8c2..744ae1dccdd 100644
--- a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java
+++ b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java
@@ -120,6 +120,100 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
         
assertThat(Long.valueOf(jvmParams.get("-XX:MaxMetaspaceSize="))).isEqualTo(metaspace);
     }
 
+    @Test
+    void testGetConfiguration() throws Exception {
+        int expectedResultLines = 13;
+        String[] commands = {
+            RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
+            
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
+            String.valueOf(expectedResultLines)
+        };
+        List<String> lines = 
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
+
+        assertThat(lines).hasSize(expectedResultLines);
+    }
+
+    @Test
+    void testGetConfigurationRemoveKey() throws Exception {
+        int expectedResultLines = 12;
+        String[] commands = {
+            RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
+            
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
+            String.valueOf(expectedResultLines),
+            "-rmKey",
+            "parallelism.default"
+        };
+        List<String> lines = 
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
+
+        assertThat(lines).hasSize(expectedResultLines);
+        assertThat(lines).doesNotContain("parallelism.default: 1");
+    }
+
+    @Test
+    void testGetConfigurationRemoveKeyValue() throws Exception {
+        int expectedResultLines = 12;
+        String[] commands = {
+            RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
+            
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
+            String.valueOf(expectedResultLines),
+            "-rmKV",
+            "parallelism.default=1"
+        };
+        List<String> lines = 
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
+
+        assertThat(lines).hasSize(expectedResultLines);
+        assertThat(lines).doesNotContain("parallelism.default: 1");
+    }
+
+    @Test
+    void testGetConfigurationRemoveKeyValueNotMatchingValue() throws Exception 
{
+        int expectedResultLines = 13;
+        String[] commands = {
+            RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
+            
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
+            String.valueOf(expectedResultLines),
+            "-rmKV",
+            "parallelism.default=2"
+        };
+        List<String> lines = 
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
+
+        assertThat(lines).hasSize(expectedResultLines);
+        assertThat(lines).contains("parallelism.default: 1");
+    }
+
+    @Test
+    void testGetConfigurationReplaceKeyValue() throws Exception {
+        int expectedResultLines = 13;
+        String[] commands = {
+            RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
+            
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
+            String.valueOf(expectedResultLines),
+            "-repKV",
+            "parallelism.default,1,2"
+        };
+        List<String> lines = 
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
+
+        assertThat(lines).hasSize(expectedResultLines);
+        assertThat(lines).doesNotContain("parallelism.default: 1");
+        assertThat(lines).contains("parallelism.default: 2");
+    }
+
+    @Test
+    void testGetConfigurationReplaceKeyValueNotMatchingValue() throws 
Exception {
+        int expectedResultLines = 13;
+        String[] commands = {
+            RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
+            
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
+            String.valueOf(expectedResultLines),
+            "-repKV",
+            "parallelism.default,2,3"
+        };
+        List<String> lines = 
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
+
+        assertThat(lines).hasSize(expectedResultLines);
+        assertThat(lines).doesNotContain("parallelism.default: 3");
+    }
+
     private static Map<String, String> parseAndAssertDynamicParameters(
             String dynamicParametersStr) {
         Set<String> expectedDynamicParameters =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfiguration.java
new file mode 100644
index 00000000000..f454df81492
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfiguration.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Configuration class which contains the parsed command line arguments for 
the {@link
+ * ClusterEntrypoint}.
+ */
+public class ModifiableClusterConfiguration {
+
+    private final boolean flattenConfig;
+
+    private final String configDir;
+
+    private final Properties dynamicProperties;
+
+    private final Properties removeKeyValues;
+
+    private final List<String> removeKeys;
+
+    private final List<Tuple3<String, String, String>> replaceKeyValues;
+
+    public ModifiableClusterConfiguration(
+            boolean flattenConfig,
+            String configDir,
+            Properties dynamicProperties,
+            Properties removeKeyValues,
+            List<String> removeKeys,
+            List<Tuple3<String, String, String>> replaceKeyValues) {
+        this.flattenConfig = flattenConfig;
+        this.configDir = configDir;
+        this.dynamicProperties = dynamicProperties;
+        this.removeKeyValues = removeKeyValues;
+        this.removeKeys = removeKeys;
+        this.replaceKeyValues = replaceKeyValues;
+    }
+
+    public boolean flattenConfig() {
+        return flattenConfig;
+    }
+
+    public Properties getRemoveKeyValues() {
+        return removeKeyValues;
+    }
+
+    public List<String> getRemoveKeys() {
+        return removeKeys;
+    }
+
+    public List<Tuple3<String, String, String>> getReplaceKeyValues() {
+        return replaceKeyValues;
+    }
+
+    public String getConfigDir() {
+        return configDir;
+    }
+
+    public Properties getDynamicProperties() {
+        return dynamicProperties;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfigurationParserFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfigurationParserFactory.java
new file mode 100644
index 00000000000..3b82ee54199
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfigurationParserFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
+import static 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;
+import static 
org.apache.flink.runtime.entrypoint.parser.ConfigurationCommandLineOptions.FLATTEN_CONFIG_OPTION;
+import static 
org.apache.flink.runtime.entrypoint.parser.ConfigurationCommandLineOptions.REMOVE_KEY_OPTION;
+import static 
org.apache.flink.runtime.entrypoint.parser.ConfigurationCommandLineOptions.REMOVE_KEY_VALUE_OPTION;
+import static 
org.apache.flink.runtime.entrypoint.parser.ConfigurationCommandLineOptions.REPLACE_KEY_VALUE_OPTION;
+
+/** A class can be used to extract the configuration from command line and 
modify it. */
+public class ModifiableClusterConfigurationParserFactory
+        implements ParserResultFactory<ModifiableClusterConfiguration> {
+
+    public static Options options() {
+        final Options options = new Options();
+        options.addOption(CONFIG_DIR_OPTION);
+        options.addOption(REMOVE_KEY_OPTION);
+        options.addOption(REMOVE_KEY_VALUE_OPTION);
+        options.addOption(REPLACE_KEY_VALUE_OPTION);
+        options.addOption(DYNAMIC_PROPERTY_OPTION);
+        options.addOption(FLATTEN_CONFIG_OPTION);
+
+        return options;
+    }
+
+    @Override
+    public Options getOptions() {
+        return options();
+    }
+
+    @Override
+    public ModifiableClusterConfiguration createResult(@Nonnull CommandLine 
commandLine) {
+        String configDir = 
commandLine.getOptionValue(CONFIG_DIR_OPTION.getOpt());
+
+        Properties dynamicProperties =
+                
commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
+
+        List<String> removeKeyList = new ArrayList<>();
+        String[] removeKeys = 
commandLine.getOptionValues(REMOVE_KEY_OPTION.getOpt());
+        if (removeKeys != null) {
+            removeKeyList = Arrays.asList(removeKeys);
+        }
+
+        Properties removeKeyValues =
+                
commandLine.getOptionProperties(REMOVE_KEY_VALUE_OPTION.getOpt());
+
+        List<Tuple3<String, String, String>> replaceKeyValueList = new 
ArrayList<>();
+        String[] replaceKeyValues = 
commandLine.getOptionValues(REPLACE_KEY_VALUE_OPTION.getOpt());
+        if (replaceKeyValues != null) {
+            for (int i = 0; i < replaceKeyValues.length; i += 3) {
+                replaceKeyValueList.add(
+                        new Tuple3<>(
+                                replaceKeyValues[i],
+                                replaceKeyValues[i + 1],
+                                replaceKeyValues[i + 2]));
+            }
+        }
+
+        return new ModifiableClusterConfiguration(
+                commandLine.hasOption(FLATTEN_CONFIG_OPTION),
+                configDir,
+                dynamicProperties,
+                removeKeyValues,
+                removeKeyList,
+                replaceKeyValueList);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/ConfigurationCommandLineOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/ConfigurationCommandLineOptions.java
new file mode 100644
index 00000000000..c7195ed6516
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/ConfigurationCommandLineOptions.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.parser;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.commons.cli.Option;
+
+/** Represents the set of command-line options related to update and get 
configuration. */
+@Internal
+public class ConfigurationCommandLineOptions {
+
+    public static final Option REPLACE_KEY_VALUE_OPTION =
+            Option.builder("repKV")
+                    .argName("key,oldValue,newValue")
+                    .longOpt("replaceKeyValue")
+                    .numberOfArgs(3)
+                    .valueSeparator(',')
+                    .desc(
+                            "Replace the specified key's value with a new one 
if it matches the old value.")
+                    .build();
+
+    public static final Option REMOVE_KEY_VALUE_OPTION =
+            Option.builder("rmKV")
+                    .argName("key=value")
+                    .longOpt("removeKeyValue")
+                    .numberOfArgs(2)
+                    .valueSeparator('=')
+                    .desc("Remove the specified key-value pairs if it matches 
the old value.")
+                    .build();
+
+    public static final Option REMOVE_KEY_OPTION =
+            Option.builder("rmKey")
+                    .argName("Key")
+                    .longOpt("removeKey")
+                    .hasArg(true)
+                    .desc("Key to remove from the configuration.")
+                    .build();
+
+    public static final Option FLATTEN_CONFIG_OPTION =
+            Option.builder("flatten")
+                    .argName("flatten configuration")
+                    .longOpt("flattenConfig")
+                    .hasArg(false)
+                    .desc(
+                            "If present, the configuration will be output in a 
flattened format instead of nested YAML.")
+                    .build();
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
index 4737dd2760b..1e1e26ec433 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.util;
 
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -26,6 +28,8 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.entrypoint.ClusterConfiguration;
 import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.ModifiableClusterConfiguration;
+import 
org.apache.flink.runtime.entrypoint.ModifiableClusterConfigurationParserFactory;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.util.MathUtils;
@@ -33,6 +37,10 @@ import org.apache.flink.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
 import static org.apache.flink.util.MathUtils.checkedDownCast;
 
 /**
@@ -144,4 +152,64 @@ public class ConfigurationParserUtils {
         return GlobalConfiguration.loadConfiguration(
                 clusterConfiguration.getConfigDir(), dynamicProperties);
     }
+
+    public static List<String> loadAndModifyConfiguration(String[] args, 
String cmdLineSyntax)
+            throws FlinkParseException {
+        final CommandLineParser<ModifiableClusterConfiguration> 
commandLineParser =
+                new CommandLineParser<>(new 
ModifiableClusterConfigurationParserFactory());
+
+        final ModifiableClusterConfiguration modifiableClusterConfiguration;
+        try {
+            modifiableClusterConfiguration = commandLineParser.parse(args);
+        } catch (FlinkParseException e) {
+            LOG.error("Could not parse the command line options.", e);
+            commandLineParser.printHelp(cmdLineSyntax);
+            throw e;
+        }
+
+        final Configuration dynamicProperties =
+                ConfigurationUtils.createConfiguration(
+                        modifiableClusterConfiguration.getDynamicProperties());
+        // 1. Load configuration and append dynamic properties to 
configuration.
+        Configuration configuration =
+                GlobalConfiguration.loadConfiguration(
+                        modifiableClusterConfiguration.getConfigDir(), 
dynamicProperties);
+
+        // 2. Replace the specified key's value with a new one if it matches 
the old value.
+        List<Tuple3<String, String, String>> replaceKeyValues =
+                modifiableClusterConfiguration.getReplaceKeyValues();
+        replaceKeyValues.forEach(
+                tuple3 -> {
+                    String key = tuple3.f0;
+                    String oldValue = tuple3.f1;
+                    String newValue = tuple3.f2;
+                    if (oldValue.equals(
+                            configuration.get(
+                                    
ConfigOptions.key(key).stringType().noDefaultValue()))) {
+                        configuration.setString(key, newValue);
+                    }
+                });
+
+        // 3. Remove the specified key value pairs if the value matches.
+        Properties removeKeyValues = 
modifiableClusterConfiguration.getRemoveKeyValues();
+        final Set<String> propertyNames = 
removeKeyValues.stringPropertyNames();
+
+        for (String propertyName : propertyNames) {
+            if (removeKeyValues
+                    .getProperty(propertyName)
+                    .equals(
+                            configuration.getString(
+                                    ConfigOptions.key(propertyName)
+                                            .stringType()
+                                            .noDefaultValue()))) {
+                configuration.removeKey(propertyName);
+            }
+        }
+
+        // 4. Remove the specified key value pairs.
+        List<String> removeKeys = 
modifiableClusterConfiguration.getRemoveKeys();
+        removeKeys.forEach(configuration::removeKey);
+        return ConfigurationUtils.convertConfigToWritableLines(
+                configuration, modifiableClusterConfiguration.flattenConfig());
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
index ed7e8fa62fc..443e2814821 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java
@@ -61,12 +61,13 @@ public class BashJavaUtils {
 
     private static List<String> runCommand(Command command, String[] 
commandArgs)
             throws FlinkException {
-        Configuration configuration = 
FlinkConfigLoader.loadConfiguration(commandArgs);
         switch (command) {
             case GET_TM_RESOURCE_PARAMS:
-                return getTmResourceParams(configuration);
+                return 
getTmResourceParams(FlinkConfigLoader.loadConfiguration(commandArgs));
             case GET_JM_RESOURCE_PARAMS:
-                return getJmResourceParams(configuration);
+                return 
getJmResourceParams(FlinkConfigLoader.loadConfiguration(commandArgs));
+            case UPDATE_AND_GET_FLINK_CONFIGURATION:
+                return 
FlinkConfigLoader.loadAndModifyConfiguration(commandArgs);
             default:
                 // unexpected, Command#valueOf should fail if a unknown 
command is passed in
                 throw new RuntimeException("Unexpected, something is wrong.");
@@ -175,6 +176,9 @@ public class BashJavaUtils {
         GET_TM_RESOURCE_PARAMS,
 
         /** Get JVM parameters and dynamic configs of job manager resources. */
-        GET_JM_RESOURCE_PARAMS
+        GET_JM_RESOURCE_PARAMS,
+
+        /** Update and get configuration from conf file and dynamic configs of 
the FLINK cluster. */
+        UPDATE_AND_GET_FLINK_CONFIGURATION
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/FlinkConfigLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/FlinkConfigLoader.java
index 4f18c04e64e..111a4e74a42 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/FlinkConfigLoader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/FlinkConfigLoader.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.util.bash;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory;
+import 
org.apache.flink.runtime.entrypoint.ModifiableClusterConfigurationParserFactory;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 import org.apache.flink.util.FlinkException;
 
@@ -36,22 +37,27 @@ import java.util.List;
  */
 public class FlinkConfigLoader {
 
-    private static final Options CMD_OPTIONS = 
ClusterConfigurationParserFactory.options();
-
     public static Configuration loadConfiguration(String[] args) throws 
FlinkException {
         return ConfigurationParserUtils.loadCommonConfiguration(
-                filterCmdArgs(args), BashJavaUtils.class.getSimpleName());
+                filterCmdArgs(args, 
ClusterConfigurationParserFactory.options()),
+                BashJavaUtils.class.getSimpleName());
+    }
+
+    public static List<String> loadAndModifyConfiguration(String[] args) 
throws FlinkException {
+        return ConfigurationParserUtils.loadAndModifyConfiguration(
+                filterCmdArgs(args, 
ModifiableClusterConfigurationParserFactory.options()),
+                BashJavaUtils.class.getSimpleName());
     }
 
-    private static String[] filterCmdArgs(String[] args) {
+    private static String[] filterCmdArgs(String[] args, Options options) {
         final List<String> filteredArgs = new ArrayList<>();
         final Iterator<String> iter = Arrays.asList(args).iterator();
 
         while (iter.hasNext()) {
             String token = iter.next();
-            if (CMD_OPTIONS.hasOption(token)) {
+            if (options.hasOption(token)) {
                 filteredArgs.add(token);
-                if (CMD_OPTIONS.getOption(token).hasArg() && iter.hasNext()) {
+                if (options.getOption(token).hasArg() && iter.hasNext()) {
                     filteredArgs.add(iter.next());
                 }
             } else if (token.startsWith("-D")) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfigurationParserFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfigurationParserFactoryTest.java
new file mode 100644
index 00000000000..05659e900e7
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfigurationParserFactoryTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for the {@link ModifiableClusterConfigurationParserFactory}. */
+class ModifiableClusterConfigurationParserFactoryTest {
+
+    private static final CommandLineParser<ModifiableClusterConfiguration> 
commandLineParser =
+            new CommandLineParser<>(new 
ModifiableClusterConfigurationParserFactory());
+
+    @Test
+    void testModifiableClusterConfigurationParsing() throws 
FlinkParseException {
+        final String configDir = "/foo/bar";
+        final String key = "key";
+        final String value = "value";
+        final String newValue = "value2";
+        final String[] args = {
+            "--configDir",
+            configDir,
+            "--removeKey",
+            key,
+            String.format("-D%s=%s", key, value),
+            "--removeKeyValue",
+            String.format("%s=%s", key, value),
+            "--replaceKeyValue",
+            String.format("%s,%s,%s", key, value, newValue),
+            "--flattenConfig"
+        };
+
+        ModifiableClusterConfiguration modifiableClusterConfiguration =
+                commandLineParser.parse(args);
+
+        
assertThat(modifiableClusterConfiguration.getConfigDir()).isEqualTo(configDir);
+
+        Properties dynamicProperties = 
modifiableClusterConfiguration.getDynamicProperties();
+        assertThat(dynamicProperties).containsEntry(key, value);
+
+        List<String> removeKeys = 
modifiableClusterConfiguration.getRemoveKeys();
+        assertThat(removeKeys).containsExactly(key);
+
+        Properties removeKeyValues = 
modifiableClusterConfiguration.getRemoveKeyValues();
+        assertThat(removeKeyValues).containsEntry(key, value);
+
+        List<Tuple3<String, String, String>> replaceKeyValues =
+                modifiableClusterConfiguration.getReplaceKeyValues();
+        assertThat(replaceKeyValues).containsExactly(Tuple3.of(key, value, 
newValue));
+
+        assertThat(modifiableClusterConfiguration.flattenConfig()).isTrue();
+    }
+
+    @Test
+    void testOnlyRequiredArguments() throws FlinkParseException {
+        final String configDir = "/foo/bar";
+        final String[] args = {"--configDir", configDir};
+
+        ModifiableClusterConfiguration modifiableClusterConfiguration =
+                commandLineParser.parse(args);
+
+        
assertThat(modifiableClusterConfiguration.getConfigDir()).isEqualTo(configDir);
+    }
+
+    @Test
+    void testMissingRequiredArgument() {
+        final String[] args = {};
+
+        assertThatThrownBy(() -> commandLineParser.parse(args))
+                .isInstanceOf(FlinkParseException.class);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java
index 763a994cf47..076a5f367c7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java
@@ -37,6 +37,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -88,6 +89,17 @@ public class FlinkConfigLoaderTest {
         verifyConfiguration(configuration, TEST_CONFIG_KEY, TEST_CONFIG_VALUE);
     }
 
+    @TestTemplate
+    void testloadAndModifyConfigurationConfigDirLongOpt() throws Exception {
+        String[] args = {"--configDir", confDir.toFile().getAbsolutePath()};
+        List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
+        if (standardYaml) {
+            assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE);
+        } else {
+            assertThat(list).containsExactly(TEST_CONFIG_KEY + ": " + 
TEST_CONFIG_VALUE);
+        }
+    }
+
     @TestTemplate
     void testLoadConfigurationConfigDirShortOpt() throws Exception {
         String[] args = {"-c", confDir.toFile().getAbsolutePath()};
@@ -95,6 +107,17 @@ public class FlinkConfigLoaderTest {
         verifyConfiguration(configuration, TEST_CONFIG_KEY, TEST_CONFIG_VALUE);
     }
 
+    @TestTemplate
+    void testloadAndModifyConfigurationConfigDirShortOpt() throws Exception {
+        String[] args = {"-c", confDir.toFile().getAbsolutePath()};
+        List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
+        if (standardYaml) {
+            assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE);
+        } else {
+            assertThat(list).containsExactly(TEST_CONFIG_KEY + ": " + 
TEST_CONFIG_VALUE);
+        }
+    }
+
     @TestTemplate
     void testLoadConfigurationDynamicPropertyWithSpace() throws Exception {
         String[] args = {"--configDir", confDir.toFile().getAbsolutePath(), 
"-D", "key=value"};
@@ -102,6 +125,19 @@ public class FlinkConfigLoaderTest {
         verifyConfiguration(configuration, "key", "value");
     }
 
+    @TestTemplate
+    void testloadAndModifyConfigurationDynamicPropertyWithSpace() throws 
Exception {
+        String[] args = {"--configDir", confDir.toFile().getAbsolutePath(), 
"-D", "key=value"};
+        List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
+        if (standardYaml) {
+            assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE, "key: value");
+        } else {
+            assertThat(list)
+                    .containsExactlyInAnyOrder(
+                            TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE, "key: 
value");
+        }
+    }
+
     @TestTemplate
     void testLoadConfigurationDynamicPropertyWithoutSpace() throws Exception {
         String[] args = {"--configDir", confDir.toFile().getAbsolutePath(), 
"-Dkey=value"};
@@ -109,6 +145,19 @@ public class FlinkConfigLoaderTest {
         verifyConfiguration(configuration, "key", "value");
     }
 
+    @TestTemplate
+    void testloadAndModifyConfigurationDynamicPropertyWithoutSpace() throws 
Exception {
+        String[] args = {"--configDir", confDir.toFile().getAbsolutePath(), 
"-Dkey=value"};
+        List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
+        if (standardYaml) {
+            assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE, "key: value");
+        } else {
+            assertThat(list)
+                    .containsExactlyInAnyOrder(
+                            TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE, "key: 
value");
+        }
+    }
+
     @TestTemplate
     void testLoadConfigurationIgnoreUnknownToken() throws Exception {
         String[] args = {
@@ -124,6 +173,159 @@ public class FlinkConfigLoaderTest {
         verifyConfiguration(configuration, "key", "value");
     }
 
+    @TestTemplate
+    void testloadAndModifyConfigurationIgnoreUnknownToken() throws Exception {
+        String[] args = {
+            "unknown",
+            "-u",
+            "--configDir",
+            confDir.toFile().getAbsolutePath(),
+            "--unknown",
+            "-Dkey=value"
+        };
+        List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
+        if (standardYaml) {
+            assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE, "key: value");
+        } else {
+            assertThat(list)
+                    .containsExactlyInAnyOrder(
+                            TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE, "key: 
value");
+        }
+    }
+
+    @TestTemplate
+    void testloadAndModifyConfigurationRemoveKeysMatched() throws Exception {
+        String key = "key";
+
+        String[] args = {
+            "--configDir",
+            confDir.toFile().getAbsolutePath(),
+            String.format("-D%s=value", key),
+            "--removeKey",
+            key
+        };
+        List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
+        if (standardYaml) {
+            assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE);
+        } else {
+            assertThat(list).containsExactlyInAnyOrder(TEST_CONFIG_KEY + ": " 
+ TEST_CONFIG_VALUE);
+        }
+    }
+
+    @TestTemplate
+    void testloadAndModifyConfigurationRemoveKeysNotMatched() throws Exception 
{
+        String key = "key";
+        String value = "value";
+        String removeKey = "removeKey";
+
+        String[] args = {
+            "--configDir",
+            confDir.toFile().getAbsolutePath(),
+            String.format("-D%s=%s", key, value),
+            "--removeKey",
+            removeKey
+        };
+        List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
+        if (standardYaml) {
+            assertThat(list)
+                    .containsExactly("test:", "  key: " + TEST_CONFIG_VALUE, 
key + ": " + value);
+        } else {
+            assertThat(list)
+                    .containsExactlyInAnyOrder(
+                            TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE, key + 
": " + value);
+        }
+    }
+
+    @TestTemplate
+    void testloadAndModifyConfigurationRemoveKeyValuesMatched() throws 
Exception {
+        String removeKey = "removeKey";
+        String removeValue = "removeValue";
+
+        String[] args = {
+            "--configDir",
+            confDir.toFile().getAbsolutePath(),
+            String.format("-D%s=%s", removeKey, removeValue),
+            "--removeKeyValue",
+            String.format("%s=%s", removeKey, removeValue)
+        };
+        List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
+        if (standardYaml) {
+            assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE);
+        } else {
+            assertThat(list).containsExactlyInAnyOrder(TEST_CONFIG_KEY + ": " 
+ TEST_CONFIG_VALUE);
+        }
+    }
+
+    @TestTemplate
+    void testloadAndModifyConfigurationRemoveKeyValuesNotMatched() throws 
Exception {
+        String removeKey = "removeKey";
+        String removeValue = "removeValue";
+        String nonExistentValue = "nonExistentValue";
+
+        String[] args = {
+            "--configDir",
+            confDir.toFile().getAbsolutePath(),
+            String.format("-D%s=%s", removeKey, removeValue),
+            "--removeKeyValue",
+            String.format("%s=%s", removeKey, nonExistentValue)
+        };
+        List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
+        if (standardYaml) {
+            assertThat(list)
+                    .containsExactlyInAnyOrder(
+                            "test:", "  key: " + TEST_CONFIG_VALUE, removeKey 
+ ": " + removeValue);
+        } else {
+            assertThat(list)
+                    .containsExactlyInAnyOrder(
+                            TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE,
+                            removeKey + ": " + removeValue);
+        }
+    }
+
+    @TestTemplate
+    void testloadAndModifyConfigurationReplaceKeyValuesMatched() throws 
Exception {
+        String newValue = "newValue";
+
+        String[] args = {
+            "--configDir",
+            confDir.toFile().getAbsolutePath(),
+            "--replaceKeyValue",
+            String.format("%s,%s,%s", TEST_CONFIG_KEY, TEST_CONFIG_VALUE, 
newValue)
+        };
+        List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
+        if (standardYaml) {
+            assertThat(list).containsExactly("test:", "  key: " + newValue);
+        } else {
+            assertThat(list).containsExactlyInAnyOrder(TEST_CONFIG_KEY + ": " 
+ newValue);
+        }
+    }
+
+    @TestTemplate
+    void testloadAndModifyConfigurationReplaceKeyValuesNotMatched() throws 
Exception {
+        String nonExistentValue = "nonExistentValue";
+        String newValue = "newValue";
+
+        String[] args = {
+            "--configDir",
+            confDir.toFile().getAbsolutePath(),
+            "--replaceKeyValue",
+            String.format("%s,%s,%s", TEST_CONFIG_KEY, nonExistentValue, 
newValue)
+        };
+        List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
+        if (standardYaml) {
+            assertThat(list).containsExactly("test:", "  key: " + 
TEST_CONFIG_VALUE);
+        } else {
+            assertThat(list).containsExactlyInAnyOrder(TEST_CONFIG_KEY + ": " 
+ TEST_CONFIG_VALUE);
+        }
+    }
+
+    @TestTemplate
+    void testloadAndModifyConfigurationWithFlatten() throws Exception {
+        String[] args = {"-c", confDir.toFile().getAbsolutePath(), "-flatten"};
+        List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args);
+        assertThat(list).containsExactly(TEST_CONFIG_KEY + ": " + 
TEST_CONFIG_VALUE);
+    }
+
     private void verifyConfiguration(Configuration config, String key, String 
expectedValue) {
         ConfigOption<String> option = key(key).stringType().noDefaultValue();
         assertThat(config.get(option)).isEqualTo(expectedValue);

Reply via email to