This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c148b62166d8fec9e7e525a836de890c2f12973b Author: JunRuiLee <jrlee....@gmail.com> AuthorDate: Fri Jan 12 21:56:15 2024 +0800 [FLINK-33721][core] Extend BashJavaUtils to support reading and writing standard yaml file. This closes #24091. --- .../src/main/flink-bin/bin/bash-java-utils.sh | 170 +++++++++++++++++ .../flink-bin/bin/config-parser-utils.sh} | 32 ++-- flink-dist/src/main/flink-bin/bin/config.sh | 143 +-------------- flink-dist/src/test/bin/runBashJavaUtilsCmd.sh | 2 +- .../src/test/bin/runExtractLoggingOutputs.sh | 2 +- .../org/apache/flink/dist/BashJavaUtilsITCase.java | 94 ++++++++++ .../entrypoint/ModifiableClusterConfiguration.java | 82 +++++++++ ...odifiableClusterConfigurationParserFactory.java | 98 ++++++++++ .../parser/ConfigurationCommandLineOptions.java | 64 +++++++ .../runtime/util/ConfigurationParserUtils.java | 68 +++++++ .../flink/runtime/util/bash/BashJavaUtils.java | 12 +- .../flink/runtime/util/bash/FlinkConfigLoader.java | 18 +- ...iableClusterConfigurationParserFactoryTest.java | 96 ++++++++++ .../runtime/util/bash/FlinkConfigLoaderTest.java | 202 +++++++++++++++++++++ 14 files changed, 920 insertions(+), 163 deletions(-) diff --git a/flink-dist/src/main/flink-bin/bin/bash-java-utils.sh b/flink-dist/src/main/flink-bin/bin/bash-java-utils.sh new file mode 100755 index 00000000000..f4d02327197 --- /dev/null +++ b/flink-dist/src/main/flink-bin/bin/bash-java-utils.sh @@ -0,0 +1,170 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +UNAME=$(uname -s) +if [ "${UNAME:0:6}" == "CYGWIN" ]; then + JAVA_RUN=java +else + if [[ -d "$JAVA_HOME" ]]; then + JAVA_RUN="$JAVA_HOME"/bin/java + else + JAVA_RUN=java + fi +fi + +manglePathList() { + UNAME=$(uname -s) + # a path list, for example a java classpath + if [ "${UNAME:0:6}" == "CYGWIN" ]; then + echo `cygpath -wp "$1"` + else + echo $1 + fi +} + +findFlinkDistJar() { + local FLINK_DIST + local LIB_DIR + if [[ -n "$1" ]]; then + LIB_DIR="$1" + else + LIB_DIR="$FLINK_LIB_DIR" + fi + FLINK_DIST="$(find "$LIB_DIR" -name 'flink-dist*.jar')" + local FLINK_DIST_COUNT + FLINK_DIST_COUNT="$(echo "$FLINK_DIST" | wc -l)" + + # If flink-dist*.jar cannot be resolved write error messages to stderr since stdout is stored + # as the classpath and exit function with empty classpath to force process failure + if [[ "$FLINK_DIST" == "" ]]; then + (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.") + exit 1 + elif [[ "$FLINK_DIST_COUNT" -gt 1 ]]; then + (>&2 echo "[ERROR] Multiple flink-dist*.jar found in $FLINK_LIB_DIR. Please resolve.") + exit 1 + fi + + echo "$FLINK_DIST" +} + +runBashJavaUtilsCmd() { + local cmd=$1 + local conf_dir=$2 + local class_path=$3 + local dynamic_args=${@:4} + class_path=`manglePathList "${class_path}"` + + local output=`"${JAVA_RUN}" -classpath "${class_path}" org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir "${conf_dir}" $dynamic_args 2>&1 | tail -n 1000` + if [[ $? -ne 0 ]]; then + echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2 + # Print the output in case the user redirect the log to console. + echo "$output" 1>&2 + exit 1 + fi + + echo "$output" +} + +updateAndGetFlinkConfiguration() { + local FLINK_CONF_DIR="$1" + local FLINK_BIN_DIR="$2" + local FLINK_LIB_DIR="$3" + local command_result + command_result=$(parseConfigurationAndExportLogs "$FLINK_CONF_DIR" "$FLINK_BIN_DIR" "$FLINK_LIB_DIR" "UPDATE_AND_GET_FLINK_CONFIGURATION" "${@:4}") + echo "$command_result" +} + +parseConfigurationAndExportLogs() { + local FLINK_CONF_DIR="$1" + local FLINK_BIN_DIR="$2" + local FLINK_LIB_DIR="$3" + local COMMAND="$4" + local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:" + + java_utils_output=$(runBashJavaUtilsCmd "${COMMAND}" "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar ${FLINK_LIB_DIR})" "${@:5}") + logging_output=$(extractLoggingOutputs "${java_utils_output}") + execution_results=$(echo "${java_utils_output}" | grep ${EXECUTION_PREFIX}) + + if [[ $? -ne 0 ]]; then + echo "[ERROR] Could not parse configurations properly." + echo "[ERROR] Raw output from BashJavaUtils:" + echo "$java_utils_output" + exit 1 + fi + + echo "${execution_results//${EXECUTION_PREFIX}/}" +} + +extractLoggingOutputs() { + local output="$1" + local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:" + + echo "${output}" | grep -v ${EXECUTION_PREFIX} +} + +extractExecutionResults() { + local output="$1" + local expected_lines="$2" + local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:" + local execution_results + local num_lines + + execution_results=$(echo "${output}" | grep ${EXECUTION_PREFIX}) + num_lines=$(echo "${execution_results}" | wc -l) + # explicit check for empty result, because if execution_results is empty, then wc returns 1 + if [[ -z ${execution_results} ]]; then + echo "[ERROR] The execution result is empty." 1>&2 + exit 1 + fi + if [[ ${num_lines} -ne ${expected_lines} ]]; then + echo "[ERROR] The execution results has unexpected number of lines, expected: ${expected_lines}, actual: ${num_lines}." 1>&2 + echo "[ERROR] An execution result line is expected following the prefix '${EXECUTION_PREFIX}'" 1>&2 + echo "$output" 1>&2 + exit 1 + fi + + echo "${execution_results//${EXECUTION_PREFIX}/}" +} + +parseResourceParamsAndExportLogs() { + local cmd=$1 + java_utils_output=$(runBashJavaUtilsCmd ${cmd} "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar)" "${@:2}") + logging_output=$(extractLoggingOutputs "${java_utils_output}") + params_output=$(extractExecutionResults "${java_utils_output}" 2) + + if [[ $? -ne 0 ]]; then + echo "[ERROR] Could not get JVM parameters and dynamic configurations properly." + echo "[ERROR] Raw output from BashJavaUtils:" + echo "$java_utils_output" + exit 1 + fi + + jvm_params=$(echo "${params_output}" | head -n1) + export JVM_ARGS="${JVM_ARGS} ${jvm_params}" + export DYNAMIC_PARAMETERS=$(IFS=" " echo "${params_output}" | tail -n1) + + export FLINK_INHERITED_LOGS=" +$FLINK_INHERITED_LOGS + +RESOURCE_PARAMS extraction logs: +jvm_params: $jvm_params +dynamic_configs: $DYNAMIC_PARAMETERS +logs: $logging_output +" +} diff --git a/flink-dist/src/test/bin/runExtractLoggingOutputs.sh b/flink-dist/src/main/flink-bin/bin/config-parser-utils.sh similarity index 65% copy from flink-dist/src/test/bin/runExtractLoggingOutputs.sh copy to flink-dist/src/main/flink-bin/bin/config-parser-utils.sh index d8929012731..cfed6db8618 100755 --- a/flink-dist/src/test/bin/runExtractLoggingOutputs.sh +++ b/flink-dist/src/main/flink-bin/bin/config-parser-utils.sh @@ -17,22 +17,28 @@ # limitations under the License. ################################################################################ -USAGE="Usage: runExtractLoggingOutputs.sh <input>" +USAGE="Usage: config-parser-utils.sh FLINK_CONF_DIR FLINK_BIN_DIR FLINK_LIB_DIR [dynamic args...]" -INPUT="$1" - -if [[ -z "${INPUT}" ]]; then - echo "$USAGE" - exit 1 +if [ "$#" -lt 3 ]; then + echo "$USAGE" + exit 1 fi -bin=`dirname "$0"` -bin=`cd "$bin"; pwd` +source "$2"/bash-java-utils.sh + +ARGS=("${@:1}") +result=$(updateAndGetFlinkConfiguration "${ARGS[@]}") -FLINK_CONF_DIR=${bin}/../../main/resources -FLINK_TARGET_DIR=${bin}/../../../target -FLINK_DIST_JAR=`find $FLINK_TARGET_DIR -name 'flink-dist*.jar'` +if [[ $? -ne 0 ]]; then + echo "[ERROR] Could not get configurations properly, the result is :" + echo "$result" + exit 1 +fi -. ${bin}/../../main/flink-bin/bin/config.sh > /dev/null +CONF_FILE="$1/flink-conf.yaml" +if [ ! -e "$1/flink-conf.yaml" ]; then + CONF_FILE="$1/config.yaml" +fi; -extractLoggingOutputs "${INPUT}" +# Output the result +echo "${result}" > "$CONF_FILE"; diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index c798bdf4c01..d90e4362f7e 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -47,25 +47,6 @@ constructFlinkClassPath() { echo "$FLINK_CLASSPATH""$FLINK_DIST" } -findFlinkDistJar() { - local FLINK_DIST - FLINK_DIST="$(find "$FLINK_LIB_DIR" -name 'flink-dist*.jar')" - local FLINK_DIST_COUNT - FLINK_DIST_COUNT="$(echo "$FLINK_DIST" | wc -l)" - - # If flink-dist*.jar cannot be resolved write error messages to stderr since stdout is stored - # as the classpath and exit function with empty classpath to force process failure - if [[ "$FLINK_DIST" == "" ]]; then - (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.") - exit 1 - elif [[ "$FLINK_DIST_COUNT" -gt 1 ]]; then - (>&2 echo "[ERROR] Multiple flink-dist*.jar found in $FLINK_LIB_DIR. Please resolve.") - exit 1 - fi - - echo "$FLINK_DIST" -} - findSqlGatewayJar() { local SQL_GATEWAY SQL_GATEWAY="$(find "$FLINK_OPT_DIR" -name 'flink-sql-gateway*.jar')" @@ -116,16 +97,6 @@ manglePath() { fi } -manglePathList() { - UNAME=$(uname -s) - # a path list, for example a java classpath - if [ "${UNAME:0:6}" == "CYGWIN" ]; then - echo `cygpath -wp "$1"` - else - echo $1 - fi -} - # Looks up a config value by key from a simple YAML-style key-value map. # $1: key to look up # $2: default value to return if key does not exist @@ -133,11 +104,9 @@ manglePathList() { readFromConfig() { local key=$1 local defaultValue=$2 - local configFile=$3 + local configuration=$3 - # first extract the value with the given key (1st sed), then trim the result (2nd sed) - # if a key exists multiple times, take the "last" one (tail) - local value=`sed -n "s/^[ ]*${key}[ ]*: \([^#]*\).*$/\1/p" "${configFile}" | sed "s/^ *//;s/ *$//" | tail -n 1` + local value=$(echo "$configuration" | grep "^[ ]*${key}[ ]*:" | cut -d ':' -f2- | sed "s/^ *//;s/ *$//" | tail -n 1) [ -z "$value" ] && echo "$defaultValue" || echo "$value" } @@ -147,7 +116,6 @@ readFromConfig() { # -or- the respective environment variables are not set. ######################################################################################################################## - # WARNING !!! , these values are only used if there is nothing else is specified in # conf/flink-conf.yaml @@ -234,8 +202,6 @@ FLINK_HOME_DIR_MANGLED=`manglePath "$FLINK_HOME"` if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_HOME_DIR_MANGLED/conf; fi FLINK_BIN_DIR=$FLINK_HOME_DIR_MANGLED/bin DEFAULT_FLINK_LOG_DIR=$FLINK_HOME_DIR_MANGLED/log -FLINK_CONF_FILE="flink-conf.yaml" -YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE} ### Exported environment variables ### export FLINK_CONF_DIR @@ -246,36 +212,13 @@ export FLINK_LIB_DIR # export /opt dir to access it for the SQL client export FLINK_OPT_DIR +source "${FLINK_BIN_DIR}/bash-java-utils.sh" +YAML_CONF=$(updateAndGetFlinkConfiguration "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}" ${FLINK_LIB_DIR} -flatten) + ######################################################################################################################## # ENVIRONMENT VARIABLES ######################################################################################################################## -# read JAVA_HOME from config with no default value -MY_JAVA_HOME=$(readFromConfig ${KEY_ENV_JAVA_HOME} "" "${YAML_CONF}") -# check if config specified JAVA_HOME -if [ -z "${MY_JAVA_HOME}" ]; then - # config did not specify JAVA_HOME. Use system JAVA_HOME - MY_JAVA_HOME="${JAVA_HOME}" -fi -# check if we have a valid JAVA_HOME and if java is not available -if [ -z "${MY_JAVA_HOME}" ] && ! type java > /dev/null 2> /dev/null; then - echo "Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME." - exit 1 -else - JAVA_HOME="${MY_JAVA_HOME}" -fi - -UNAME=$(uname -s) -if [ "${UNAME:0:6}" == "CYGWIN" ]; then - JAVA_RUN=java -else - if [[ -d "$JAVA_HOME" ]]; then - JAVA_RUN="$JAVA_HOME"/bin/java - else - JAVA_RUN=java - fi -fi - # Define HOSTNAME if it is not already set if [ -z "${HOSTNAME}" ]; then HOSTNAME=`hostname` @@ -561,82 +504,6 @@ TMWorkers() { fi } -runBashJavaUtilsCmd() { - local cmd=$1 - local conf_dir=$2 - local class_path=$3 - local dynamic_args=${@:4} - class_path=`manglePathList "${class_path}"` - - local output=`"${JAVA_RUN}" -classpath "${class_path}" org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir "${conf_dir}" $dynamic_args 2>&1 | tail -n 1000` - if [[ $? -ne 0 ]]; then - echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2 - # Print the output in case the user redirect the log to console. - echo "$output" 1>&2 - exit 1 - fi - - echo "$output" -} - -extractExecutionResults() { - local output="$1" - local expected_lines="$2" - local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:" - local execution_results - local num_lines - - execution_results=$(echo "${output}" | grep ${EXECUTION_PREFIX}) - num_lines=$(echo "${execution_results}" | wc -l) - # explicit check for empty result, because if execution_results is empty, then wc returns 1 - if [[ -z ${execution_results} ]]; then - echo "[ERROR] The execution result is empty." 1>&2 - exit 1 - fi - if [[ ${num_lines} -ne ${expected_lines} ]]; then - echo "[ERROR] The execution results has unexpected number of lines, expected: ${expected_lines}, actual: ${num_lines}." 1>&2 - echo "[ERROR] An execution result line is expected following the prefix '${EXECUTION_PREFIX}'" 1>&2 - echo "$output" 1>&2 - exit 1 - fi - - echo "${execution_results//${EXECUTION_PREFIX}/}" -} - -extractLoggingOutputs() { - local output="$1" - local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:" - - echo "${output}" | grep -v ${EXECUTION_PREFIX} -} - -parseResourceParamsAndExportLogs() { - local cmd=$1 - java_utils_output=$(runBashJavaUtilsCmd ${cmd} "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar)" "${@:2}") - logging_output=$(extractLoggingOutputs "${java_utils_output}") - params_output=$(extractExecutionResults "${java_utils_output}" 2) - - if [[ $? -ne 0 ]]; then - echo "[ERROR] Could not get JVM parameters and dynamic configurations properly." - echo "[ERROR] Raw output from BashJavaUtils:" - echo "$java_utils_output" - exit 1 - fi - - jvm_params=$(echo "${params_output}" | head -n1) - export JVM_ARGS="${JVM_ARGS} ${jvm_params}" - export DYNAMIC_PARAMETERS=$(IFS=" " echo "${params_output}" | tail -n1) - - export FLINK_INHERITED_LOGS=" -$FLINK_INHERITED_LOGS - -RESOURCE_PARAMS extraction logs: -jvm_params: $jvm_params -dynamic_configs: $DYNAMIC_PARAMETERS -logs: $logging_output -" -} - parseJmArgsAndExportLogs() { parseResourceParamsAndExportLogs GET_JM_RESOURCE_PARAMS "$@" } diff --git a/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh b/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh index 41efe8ad1ee..e0340146b87 100755 --- a/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh +++ b/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh @@ -37,7 +37,7 @@ FLINK_TARGET_DIR=${bin}/../../../target FLINK_DIST_JARS=(`find ${FLINK_TARGET_DIR} -maxdepth 1 -name 'flink-dist*.jar'`) FLINK_DIST_CLASSPATH=`echo ${FLINK_DIST_JARS[@]} | tr ' ' ':'` -. ${bin}/../../main/flink-bin/bin/config.sh > /dev/null +. ${bin}/../../main/flink-bin/bin/bash-java-utils.sh > /dev/null output=$(runBashJavaUtilsCmd ${COMMAND} ${FLINK_CONF_DIR} "$FLINK_TARGET_DIR/bash-java-utils.jar:${FLINK_DIST_CLASSPATH}" $DYNAMIC_OPTS) extractExecutionResults "${output}" ${EXPECTED_LINES} diff --git a/flink-dist/src/test/bin/runExtractLoggingOutputs.sh b/flink-dist/src/test/bin/runExtractLoggingOutputs.sh index d8929012731..cd04666afa5 100755 --- a/flink-dist/src/test/bin/runExtractLoggingOutputs.sh +++ b/flink-dist/src/test/bin/runExtractLoggingOutputs.sh @@ -33,6 +33,6 @@ FLINK_CONF_DIR=${bin}/../../main/resources FLINK_TARGET_DIR=${bin}/../../../target FLINK_DIST_JAR=`find $FLINK_TARGET_DIR -name 'flink-dist*.jar'` -. ${bin}/../../main/flink-bin/bin/config.sh > /dev/null +. ${bin}/../../main/flink-bin/bin/bash-java-utils.sh > /dev/null extractLoggingOutputs "${INPUT}" diff --git a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java index e9c0e1cf8c2..744ae1dccdd 100644 --- a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java +++ b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java @@ -120,6 +120,100 @@ class BashJavaUtilsITCase extends JavaBashTestBase { assertThat(Long.valueOf(jvmParams.get("-XX:MaxMetaspaceSize="))).isEqualTo(metaspace); } + @Test + void testGetConfiguration() throws Exception { + int expectedResultLines = 13; + String[] commands = { + RUN_BASH_JAVA_UTILS_CMD_SCRIPT, + BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), + String.valueOf(expectedResultLines) + }; + List<String> lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); + + assertThat(lines).hasSize(expectedResultLines); + } + + @Test + void testGetConfigurationRemoveKey() throws Exception { + int expectedResultLines = 12; + String[] commands = { + RUN_BASH_JAVA_UTILS_CMD_SCRIPT, + BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), + String.valueOf(expectedResultLines), + "-rmKey", + "parallelism.default" + }; + List<String> lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); + + assertThat(lines).hasSize(expectedResultLines); + assertThat(lines).doesNotContain("parallelism.default: 1"); + } + + @Test + void testGetConfigurationRemoveKeyValue() throws Exception { + int expectedResultLines = 12; + String[] commands = { + RUN_BASH_JAVA_UTILS_CMD_SCRIPT, + BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), + String.valueOf(expectedResultLines), + "-rmKV", + "parallelism.default=1" + }; + List<String> lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); + + assertThat(lines).hasSize(expectedResultLines); + assertThat(lines).doesNotContain("parallelism.default: 1"); + } + + @Test + void testGetConfigurationRemoveKeyValueNotMatchingValue() throws Exception { + int expectedResultLines = 13; + String[] commands = { + RUN_BASH_JAVA_UTILS_CMD_SCRIPT, + BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), + String.valueOf(expectedResultLines), + "-rmKV", + "parallelism.default=2" + }; + List<String> lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); + + assertThat(lines).hasSize(expectedResultLines); + assertThat(lines).contains("parallelism.default: 1"); + } + + @Test + void testGetConfigurationReplaceKeyValue() throws Exception { + int expectedResultLines = 13; + String[] commands = { + RUN_BASH_JAVA_UTILS_CMD_SCRIPT, + BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), + String.valueOf(expectedResultLines), + "-repKV", + "parallelism.default,1,2" + }; + List<String> lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); + + assertThat(lines).hasSize(expectedResultLines); + assertThat(lines).doesNotContain("parallelism.default: 1"); + assertThat(lines).contains("parallelism.default: 2"); + } + + @Test + void testGetConfigurationReplaceKeyValueNotMatchingValue() throws Exception { + int expectedResultLines = 13; + String[] commands = { + RUN_BASH_JAVA_UTILS_CMD_SCRIPT, + BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), + String.valueOf(expectedResultLines), + "-repKV", + "parallelism.default,2,3" + }; + List<String> lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); + + assertThat(lines).hasSize(expectedResultLines); + assertThat(lines).doesNotContain("parallelism.default: 3"); + } + private static Map<String, String> parseAndAssertDynamicParameters( String dynamicParametersStr) { Set<String> expectedDynamicParameters = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfiguration.java new file mode 100644 index 00000000000..f454df81492 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfiguration.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint; + +import org.apache.flink.api.java.tuple.Tuple3; + +import java.util.List; +import java.util.Properties; + +/** + * Configuration class which contains the parsed command line arguments for the {@link + * ClusterEntrypoint}. + */ +public class ModifiableClusterConfiguration { + + private final boolean flattenConfig; + + private final String configDir; + + private final Properties dynamicProperties; + + private final Properties removeKeyValues; + + private final List<String> removeKeys; + + private final List<Tuple3<String, String, String>> replaceKeyValues; + + public ModifiableClusterConfiguration( + boolean flattenConfig, + String configDir, + Properties dynamicProperties, + Properties removeKeyValues, + List<String> removeKeys, + List<Tuple3<String, String, String>> replaceKeyValues) { + this.flattenConfig = flattenConfig; + this.configDir = configDir; + this.dynamicProperties = dynamicProperties; + this.removeKeyValues = removeKeyValues; + this.removeKeys = removeKeys; + this.replaceKeyValues = replaceKeyValues; + } + + public boolean flattenConfig() { + return flattenConfig; + } + + public Properties getRemoveKeyValues() { + return removeKeyValues; + } + + public List<String> getRemoveKeys() { + return removeKeys; + } + + public List<Tuple3<String, String, String>> getReplaceKeyValues() { + return replaceKeyValues; + } + + public String getConfigDir() { + return configDir; + } + + public Properties getDynamicProperties() { + return dynamicProperties; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfigurationParserFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfigurationParserFactory.java new file mode 100644 index 00000000000..3b82ee54199 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfigurationParserFactory.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION; +import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION; +import static org.apache.flink.runtime.entrypoint.parser.ConfigurationCommandLineOptions.FLATTEN_CONFIG_OPTION; +import static org.apache.flink.runtime.entrypoint.parser.ConfigurationCommandLineOptions.REMOVE_KEY_OPTION; +import static org.apache.flink.runtime.entrypoint.parser.ConfigurationCommandLineOptions.REMOVE_KEY_VALUE_OPTION; +import static org.apache.flink.runtime.entrypoint.parser.ConfigurationCommandLineOptions.REPLACE_KEY_VALUE_OPTION; + +/** A class can be used to extract the configuration from command line and modify it. */ +public class ModifiableClusterConfigurationParserFactory + implements ParserResultFactory<ModifiableClusterConfiguration> { + + public static Options options() { + final Options options = new Options(); + options.addOption(CONFIG_DIR_OPTION); + options.addOption(REMOVE_KEY_OPTION); + options.addOption(REMOVE_KEY_VALUE_OPTION); + options.addOption(REPLACE_KEY_VALUE_OPTION); + options.addOption(DYNAMIC_PROPERTY_OPTION); + options.addOption(FLATTEN_CONFIG_OPTION); + + return options; + } + + @Override + public Options getOptions() { + return options(); + } + + @Override + public ModifiableClusterConfiguration createResult(@Nonnull CommandLine commandLine) { + String configDir = commandLine.getOptionValue(CONFIG_DIR_OPTION.getOpt()); + + Properties dynamicProperties = + commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt()); + + List<String> removeKeyList = new ArrayList<>(); + String[] removeKeys = commandLine.getOptionValues(REMOVE_KEY_OPTION.getOpt()); + if (removeKeys != null) { + removeKeyList = Arrays.asList(removeKeys); + } + + Properties removeKeyValues = + commandLine.getOptionProperties(REMOVE_KEY_VALUE_OPTION.getOpt()); + + List<Tuple3<String, String, String>> replaceKeyValueList = new ArrayList<>(); + String[] replaceKeyValues = commandLine.getOptionValues(REPLACE_KEY_VALUE_OPTION.getOpt()); + if (replaceKeyValues != null) { + for (int i = 0; i < replaceKeyValues.length; i += 3) { + replaceKeyValueList.add( + new Tuple3<>( + replaceKeyValues[i], + replaceKeyValues[i + 1], + replaceKeyValues[i + 2])); + } + } + + return new ModifiableClusterConfiguration( + commandLine.hasOption(FLATTEN_CONFIG_OPTION), + configDir, + dynamicProperties, + removeKeyValues, + removeKeyList, + replaceKeyValueList); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/ConfigurationCommandLineOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/ConfigurationCommandLineOptions.java new file mode 100644 index 00000000000..c7195ed6516 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/ConfigurationCommandLineOptions.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint.parser; + +import org.apache.flink.annotation.Internal; + +import org.apache.commons.cli.Option; + +/** Represents the set of command-line options related to update and get configuration. */ +@Internal +public class ConfigurationCommandLineOptions { + + public static final Option REPLACE_KEY_VALUE_OPTION = + Option.builder("repKV") + .argName("key,oldValue,newValue") + .longOpt("replaceKeyValue") + .numberOfArgs(3) + .valueSeparator(',') + .desc( + "Replace the specified key's value with a new one if it matches the old value.") + .build(); + + public static final Option REMOVE_KEY_VALUE_OPTION = + Option.builder("rmKV") + .argName("key=value") + .longOpt("removeKeyValue") + .numberOfArgs(2) + .valueSeparator('=') + .desc("Remove the specified key-value pairs if it matches the old value.") + .build(); + + public static final Option REMOVE_KEY_OPTION = + Option.builder("rmKey") + .argName("Key") + .longOpt("removeKey") + .hasArg(true) + .desc("Key to remove from the configuration.") + .build(); + + public static final Option FLATTEN_CONFIG_OPTION = + Option.builder("flatten") + .argName("flatten configuration") + .longOpt("flattenConfig") + .hasArg(false) + .desc( + "If present, the configuration will be output in a flattened format instead of nested YAML.") + .build(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java index 4737dd2760b..1e1e26ec433 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.util; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.GlobalConfiguration; @@ -26,6 +28,8 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.entrypoint.ClusterConfiguration; import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory; import org.apache.flink.runtime.entrypoint.FlinkParseException; +import org.apache.flink.runtime.entrypoint.ModifiableClusterConfiguration; +import org.apache.flink.runtime.entrypoint.ModifiableClusterConfigurationParserFactory; import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.util.MathUtils; @@ -33,6 +37,10 @@ import org.apache.flink.util.MathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; +import java.util.Properties; +import java.util.Set; + import static org.apache.flink.util.MathUtils.checkedDownCast; /** @@ -144,4 +152,64 @@ public class ConfigurationParserUtils { return GlobalConfiguration.loadConfiguration( clusterConfiguration.getConfigDir(), dynamicProperties); } + + public static List<String> loadAndModifyConfiguration(String[] args, String cmdLineSyntax) + throws FlinkParseException { + final CommandLineParser<ModifiableClusterConfiguration> commandLineParser = + new CommandLineParser<>(new ModifiableClusterConfigurationParserFactory()); + + final ModifiableClusterConfiguration modifiableClusterConfiguration; + try { + modifiableClusterConfiguration = commandLineParser.parse(args); + } catch (FlinkParseException e) { + LOG.error("Could not parse the command line options.", e); + commandLineParser.printHelp(cmdLineSyntax); + throw e; + } + + final Configuration dynamicProperties = + ConfigurationUtils.createConfiguration( + modifiableClusterConfiguration.getDynamicProperties()); + // 1. Load configuration and append dynamic properties to configuration. + Configuration configuration = + GlobalConfiguration.loadConfiguration( + modifiableClusterConfiguration.getConfigDir(), dynamicProperties); + + // 2. Replace the specified key's value with a new one if it matches the old value. + List<Tuple3<String, String, String>> replaceKeyValues = + modifiableClusterConfiguration.getReplaceKeyValues(); + replaceKeyValues.forEach( + tuple3 -> { + String key = tuple3.f0; + String oldValue = tuple3.f1; + String newValue = tuple3.f2; + if (oldValue.equals( + configuration.get( + ConfigOptions.key(key).stringType().noDefaultValue()))) { + configuration.setString(key, newValue); + } + }); + + // 3. Remove the specified key value pairs if the value matches. + Properties removeKeyValues = modifiableClusterConfiguration.getRemoveKeyValues(); + final Set<String> propertyNames = removeKeyValues.stringPropertyNames(); + + for (String propertyName : propertyNames) { + if (removeKeyValues + .getProperty(propertyName) + .equals( + configuration.getString( + ConfigOptions.key(propertyName) + .stringType() + .noDefaultValue()))) { + configuration.removeKey(propertyName); + } + } + + // 4. Remove the specified key value pairs. + List<String> removeKeys = modifiableClusterConfiguration.getRemoveKeys(); + removeKeys.forEach(configuration::removeKey); + return ConfigurationUtils.convertConfigToWritableLines( + configuration, modifiableClusterConfiguration.flattenConfig()); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java index ed7e8fa62fc..443e2814821 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/BashJavaUtils.java @@ -61,12 +61,13 @@ public class BashJavaUtils { private static List<String> runCommand(Command command, String[] commandArgs) throws FlinkException { - Configuration configuration = FlinkConfigLoader.loadConfiguration(commandArgs); switch (command) { case GET_TM_RESOURCE_PARAMS: - return getTmResourceParams(configuration); + return getTmResourceParams(FlinkConfigLoader.loadConfiguration(commandArgs)); case GET_JM_RESOURCE_PARAMS: - return getJmResourceParams(configuration); + return getJmResourceParams(FlinkConfigLoader.loadConfiguration(commandArgs)); + case UPDATE_AND_GET_FLINK_CONFIGURATION: + return FlinkConfigLoader.loadAndModifyConfiguration(commandArgs); default: // unexpected, Command#valueOf should fail if a unknown command is passed in throw new RuntimeException("Unexpected, something is wrong."); @@ -175,6 +176,9 @@ public class BashJavaUtils { GET_TM_RESOURCE_PARAMS, /** Get JVM parameters and dynamic configs of job manager resources. */ - GET_JM_RESOURCE_PARAMS + GET_JM_RESOURCE_PARAMS, + + /** Update and get configuration from conf file and dynamic configs of the FLINK cluster. */ + UPDATE_AND_GET_FLINK_CONFIGURATION } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/FlinkConfigLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/FlinkConfigLoader.java index 4f18c04e64e..111a4e74a42 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/FlinkConfigLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/bash/FlinkConfigLoader.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.util.bash; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory; +import org.apache.flink.runtime.entrypoint.ModifiableClusterConfigurationParserFactory; import org.apache.flink.runtime.util.ConfigurationParserUtils; import org.apache.flink.util.FlinkException; @@ -36,22 +37,27 @@ import java.util.List; */ public class FlinkConfigLoader { - private static final Options CMD_OPTIONS = ClusterConfigurationParserFactory.options(); - public static Configuration loadConfiguration(String[] args) throws FlinkException { return ConfigurationParserUtils.loadCommonConfiguration( - filterCmdArgs(args), BashJavaUtils.class.getSimpleName()); + filterCmdArgs(args, ClusterConfigurationParserFactory.options()), + BashJavaUtils.class.getSimpleName()); + } + + public static List<String> loadAndModifyConfiguration(String[] args) throws FlinkException { + return ConfigurationParserUtils.loadAndModifyConfiguration( + filterCmdArgs(args, ModifiableClusterConfigurationParserFactory.options()), + BashJavaUtils.class.getSimpleName()); } - private static String[] filterCmdArgs(String[] args) { + private static String[] filterCmdArgs(String[] args, Options options) { final List<String> filteredArgs = new ArrayList<>(); final Iterator<String> iter = Arrays.asList(args).iterator(); while (iter.hasNext()) { String token = iter.next(); - if (CMD_OPTIONS.hasOption(token)) { + if (options.hasOption(token)) { filteredArgs.add(token); - if (CMD_OPTIONS.getOption(token).hasArg() && iter.hasNext()) { + if (options.getOption(token).hasArg() && iter.hasNext()) { filteredArgs.add(iter.next()); } } else if (token.startsWith("-D")) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfigurationParserFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfigurationParserFactoryTest.java new file mode 100644 index 00000000000..05659e900e7 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ModifiableClusterConfigurationParserFactoryTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for the {@link ModifiableClusterConfigurationParserFactory}. */ +class ModifiableClusterConfigurationParserFactoryTest { + + private static final CommandLineParser<ModifiableClusterConfiguration> commandLineParser = + new CommandLineParser<>(new ModifiableClusterConfigurationParserFactory()); + + @Test + void testModifiableClusterConfigurationParsing() throws FlinkParseException { + final String configDir = "/foo/bar"; + final String key = "key"; + final String value = "value"; + final String newValue = "value2"; + final String[] args = { + "--configDir", + configDir, + "--removeKey", + key, + String.format("-D%s=%s", key, value), + "--removeKeyValue", + String.format("%s=%s", key, value), + "--replaceKeyValue", + String.format("%s,%s,%s", key, value, newValue), + "--flattenConfig" + }; + + ModifiableClusterConfiguration modifiableClusterConfiguration = + commandLineParser.parse(args); + + assertThat(modifiableClusterConfiguration.getConfigDir()).isEqualTo(configDir); + + Properties dynamicProperties = modifiableClusterConfiguration.getDynamicProperties(); + assertThat(dynamicProperties).containsEntry(key, value); + + List<String> removeKeys = modifiableClusterConfiguration.getRemoveKeys(); + assertThat(removeKeys).containsExactly(key); + + Properties removeKeyValues = modifiableClusterConfiguration.getRemoveKeyValues(); + assertThat(removeKeyValues).containsEntry(key, value); + + List<Tuple3<String, String, String>> replaceKeyValues = + modifiableClusterConfiguration.getReplaceKeyValues(); + assertThat(replaceKeyValues).containsExactly(Tuple3.of(key, value, newValue)); + + assertThat(modifiableClusterConfiguration.flattenConfig()).isTrue(); + } + + @Test + void testOnlyRequiredArguments() throws FlinkParseException { + final String configDir = "/foo/bar"; + final String[] args = {"--configDir", configDir}; + + ModifiableClusterConfiguration modifiableClusterConfiguration = + commandLineParser.parse(args); + + assertThat(modifiableClusterConfiguration.getConfigDir()).isEqualTo(configDir); + } + + @Test + void testMissingRequiredArgument() { + final String[] args = {}; + + assertThatThrownBy(() -> commandLineParser.parse(args)) + .isInstanceOf(FlinkParseException.class); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java index 763a994cf47..076a5f367c7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/bash/FlinkConfigLoaderTest.java @@ -37,6 +37,7 @@ import java.io.FileWriter; import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.List; import static org.apache.flink.configuration.ConfigOptions.key; import static org.assertj.core.api.Assertions.assertThat; @@ -88,6 +89,17 @@ public class FlinkConfigLoaderTest { verifyConfiguration(configuration, TEST_CONFIG_KEY, TEST_CONFIG_VALUE); } + @TestTemplate + void testloadAndModifyConfigurationConfigDirLongOpt() throws Exception { + String[] args = {"--configDir", confDir.toFile().getAbsolutePath()}; + List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args); + if (standardYaml) { + assertThat(list).containsExactly("test:", " key: " + TEST_CONFIG_VALUE); + } else { + assertThat(list).containsExactly(TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE); + } + } + @TestTemplate void testLoadConfigurationConfigDirShortOpt() throws Exception { String[] args = {"-c", confDir.toFile().getAbsolutePath()}; @@ -95,6 +107,17 @@ public class FlinkConfigLoaderTest { verifyConfiguration(configuration, TEST_CONFIG_KEY, TEST_CONFIG_VALUE); } + @TestTemplate + void testloadAndModifyConfigurationConfigDirShortOpt() throws Exception { + String[] args = {"-c", confDir.toFile().getAbsolutePath()}; + List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args); + if (standardYaml) { + assertThat(list).containsExactly("test:", " key: " + TEST_CONFIG_VALUE); + } else { + assertThat(list).containsExactly(TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE); + } + } + @TestTemplate void testLoadConfigurationDynamicPropertyWithSpace() throws Exception { String[] args = {"--configDir", confDir.toFile().getAbsolutePath(), "-D", "key=value"}; @@ -102,6 +125,19 @@ public class FlinkConfigLoaderTest { verifyConfiguration(configuration, "key", "value"); } + @TestTemplate + void testloadAndModifyConfigurationDynamicPropertyWithSpace() throws Exception { + String[] args = {"--configDir", confDir.toFile().getAbsolutePath(), "-D", "key=value"}; + List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args); + if (standardYaml) { + assertThat(list).containsExactly("test:", " key: " + TEST_CONFIG_VALUE, "key: value"); + } else { + assertThat(list) + .containsExactlyInAnyOrder( + TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE, "key: value"); + } + } + @TestTemplate void testLoadConfigurationDynamicPropertyWithoutSpace() throws Exception { String[] args = {"--configDir", confDir.toFile().getAbsolutePath(), "-Dkey=value"}; @@ -109,6 +145,19 @@ public class FlinkConfigLoaderTest { verifyConfiguration(configuration, "key", "value"); } + @TestTemplate + void testloadAndModifyConfigurationDynamicPropertyWithoutSpace() throws Exception { + String[] args = {"--configDir", confDir.toFile().getAbsolutePath(), "-Dkey=value"}; + List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args); + if (standardYaml) { + assertThat(list).containsExactly("test:", " key: " + TEST_CONFIG_VALUE, "key: value"); + } else { + assertThat(list) + .containsExactlyInAnyOrder( + TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE, "key: value"); + } + } + @TestTemplate void testLoadConfigurationIgnoreUnknownToken() throws Exception { String[] args = { @@ -124,6 +173,159 @@ public class FlinkConfigLoaderTest { verifyConfiguration(configuration, "key", "value"); } + @TestTemplate + void testloadAndModifyConfigurationIgnoreUnknownToken() throws Exception { + String[] args = { + "unknown", + "-u", + "--configDir", + confDir.toFile().getAbsolutePath(), + "--unknown", + "-Dkey=value" + }; + List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args); + if (standardYaml) { + assertThat(list).containsExactly("test:", " key: " + TEST_CONFIG_VALUE, "key: value"); + } else { + assertThat(list) + .containsExactlyInAnyOrder( + TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE, "key: value"); + } + } + + @TestTemplate + void testloadAndModifyConfigurationRemoveKeysMatched() throws Exception { + String key = "key"; + + String[] args = { + "--configDir", + confDir.toFile().getAbsolutePath(), + String.format("-D%s=value", key), + "--removeKey", + key + }; + List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args); + if (standardYaml) { + assertThat(list).containsExactly("test:", " key: " + TEST_CONFIG_VALUE); + } else { + assertThat(list).containsExactlyInAnyOrder(TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE); + } + } + + @TestTemplate + void testloadAndModifyConfigurationRemoveKeysNotMatched() throws Exception { + String key = "key"; + String value = "value"; + String removeKey = "removeKey"; + + String[] args = { + "--configDir", + confDir.toFile().getAbsolutePath(), + String.format("-D%s=%s", key, value), + "--removeKey", + removeKey + }; + List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args); + if (standardYaml) { + assertThat(list) + .containsExactly("test:", " key: " + TEST_CONFIG_VALUE, key + ": " + value); + } else { + assertThat(list) + .containsExactlyInAnyOrder( + TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE, key + ": " + value); + } + } + + @TestTemplate + void testloadAndModifyConfigurationRemoveKeyValuesMatched() throws Exception { + String removeKey = "removeKey"; + String removeValue = "removeValue"; + + String[] args = { + "--configDir", + confDir.toFile().getAbsolutePath(), + String.format("-D%s=%s", removeKey, removeValue), + "--removeKeyValue", + String.format("%s=%s", removeKey, removeValue) + }; + List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args); + if (standardYaml) { + assertThat(list).containsExactly("test:", " key: " + TEST_CONFIG_VALUE); + } else { + assertThat(list).containsExactlyInAnyOrder(TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE); + } + } + + @TestTemplate + void testloadAndModifyConfigurationRemoveKeyValuesNotMatched() throws Exception { + String removeKey = "removeKey"; + String removeValue = "removeValue"; + String nonExistentValue = "nonExistentValue"; + + String[] args = { + "--configDir", + confDir.toFile().getAbsolutePath(), + String.format("-D%s=%s", removeKey, removeValue), + "--removeKeyValue", + String.format("%s=%s", removeKey, nonExistentValue) + }; + List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args); + if (standardYaml) { + assertThat(list) + .containsExactlyInAnyOrder( + "test:", " key: " + TEST_CONFIG_VALUE, removeKey + ": " + removeValue); + } else { + assertThat(list) + .containsExactlyInAnyOrder( + TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE, + removeKey + ": " + removeValue); + } + } + + @TestTemplate + void testloadAndModifyConfigurationReplaceKeyValuesMatched() throws Exception { + String newValue = "newValue"; + + String[] args = { + "--configDir", + confDir.toFile().getAbsolutePath(), + "--replaceKeyValue", + String.format("%s,%s,%s", TEST_CONFIG_KEY, TEST_CONFIG_VALUE, newValue) + }; + List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args); + if (standardYaml) { + assertThat(list).containsExactly("test:", " key: " + newValue); + } else { + assertThat(list).containsExactlyInAnyOrder(TEST_CONFIG_KEY + ": " + newValue); + } + } + + @TestTemplate + void testloadAndModifyConfigurationReplaceKeyValuesNotMatched() throws Exception { + String nonExistentValue = "nonExistentValue"; + String newValue = "newValue"; + + String[] args = { + "--configDir", + confDir.toFile().getAbsolutePath(), + "--replaceKeyValue", + String.format("%s,%s,%s", TEST_CONFIG_KEY, nonExistentValue, newValue) + }; + List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args); + if (standardYaml) { + assertThat(list).containsExactly("test:", " key: " + TEST_CONFIG_VALUE); + } else { + assertThat(list).containsExactlyInAnyOrder(TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE); + } + } + + @TestTemplate + void testloadAndModifyConfigurationWithFlatten() throws Exception { + String[] args = {"-c", confDir.toFile().getAbsolutePath(), "-flatten"}; + List<String> list = FlinkConfigLoader.loadAndModifyConfiguration(args); + assertThat(list).containsExactly(TEST_CONFIG_KEY + ": " + TEST_CONFIG_VALUE); + } + private void verifyConfiguration(Configuration config, String key, String expectedValue) { ConfigOption<String> option = key(key).stringType().noDefaultValue(); assertThat(config.get(option)).isEqualTo(expectedValue);