This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9721ce835f5 [FLINK-33577][dist] Change the default config file to 
config.yaml in flink-dist.
9721ce835f5 is described below

commit 9721ce835f5a7f28f2ad187346e009633307097b
Author: JunRuiLee <jrlee....@gmail.com>
AuthorDate: Wed Aug 23 22:48:30 2023 +0800

    [FLINK-33577][dist] Change the default config file to config.yaml in 
flink-dist.
    
    This closes #24177.
---
 .../generated/kubernetes_config_configuration.html |   2 +-
 .../shortcodes/generated/python_configuration.html |   2 +-
 .../generated/rocksdb_configuration.html           |   2 +-
 .../generated/state_backend_rocksdb_section.html   |   2 +-
 .../api/java/hadoop/mapred/utils/HadoopUtils.java  |   2 +-
 .../common/restartstrategy/RestartStrategies.java  |   2 +-
 .../flink/configuration/ConfigConstants.java       |   4 +-
 .../configuration/ResourceManagerOptions.java      |   2 +-
 flink-dist/src/main/assemblies/bin.xml             |   2 +-
 flink-dist/src/main/flink-bin/bin/config.sh        |   8 +-
 flink-dist/src/main/resources/config.yaml          | 298 ++++++++++++++++++++
 flink-dist/src/main/resources/flink-conf.yaml      | 311 ---------------------
 .../org/apache/flink/dist/BashJavaUtilsITCase.java |  51 +++-
 .../flink/tests/util/flink/FlinkDistribution.java  |   6 +-
 flink-end-to-end-tests/test-scripts/common.sh      |  18 +-
 .../test-scripts/common_yarn_docker.sh             |   4 +-
 .../test-scripts/test_pyflink.sh                   |  10 +-
 .../test_yarn_application_kerberos_docker.sh       |   2 +-
 .../test-scripts/test_yarn_job_kerberos_docker.sh  |   2 +-
 .../parquet/ParquetVectorizedInputFormat.java      |   2 +-
 .../java/org/apache/flink/api/java/DataSet.java    |   2 +-
 .../configuration/KubernetesConfigOptions.java     |   2 +-
 .../decorators/FlinkConfMountDecorator.java        |   2 +-
 .../parameters/KubernetesParameters.java           |   2 +-
 flink-python/dev/dev-requirements.txt              |   1 +
 flink-python/pyflink/common/configuration.py       |  13 +-
 flink-python/pyflink/common/restart_strategy.py    |   2 +-
 flink-python/pyflink/datastream/state_backend.py   |  16 +-
 .../datastream/stream_execution_environment.py     |   2 +-
 flink-python/pyflink/pyflink_gateway_server.py     |  68 +++--
 flink-python/pyflink/table/table_config.py         |   6 +-
 flink-python/pyflink/table/table_environment.py    |  34 ++-
 flink-python/setup.py                              |   4 +-
 .../java/org/apache/flink/python/PythonConfig.java |   2 +-
 .../org/apache/flink/python/PythonOptions.java     |   2 +-
 .../RestartBackoffTimeStrategyFactoryLoader.java   |   4 +-
 .../runtime/state/CheckpointStorageLoader.java     |   4 +-
 .../runtime/state/filesystem/FsStateBackend.java   |   2 +-
 .../runtime/state/memory/MemoryStateBackend.java   |   2 +-
 .../state/EmbeddedRocksDBStateBackend.java         |   4 +-
 .../contrib/streaming/state/RocksDBOptions.java    |   2 +-
 .../streaming/state/RocksDBStateBackend.java       |   7 +-
 .../state/RocksDBStateBackendConfigTest.java       |   6 +-
 .../environment/StreamExecutionEnvironment.java    |   2 +-
 .../gateway/service/context/DefaultContext.java    |   4 +-
 .../rest/util/SqlGatewayRestEndpointTestUtils.java |   2 +-
 .../service/context/SessionContextTest.java        |   2 +-
 .../org/apache/flink/table/api/TableConfig.java    |   4 +-
 .../container/FlinkContainersSettings.java         |   4 +-
 .../testframe/container/FlinkImageBuilder.java     |  15 +-
 .../java/org/apache/flink/yarn/YarnTestBase.java   |  10 +-
 .../src/main/java/org/apache/flink/yarn/Utils.java |   2 +-
 52 files changed, 510 insertions(+), 456 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html 
b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
index 7bbdfd5e404..86d147f6c00 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
@@ -90,7 +90,7 @@
             <td><h5>kubernetes.flink.conf.dir</h5></td>
             <td style="word-wrap: break-word;">"/opt/flink/conf"</td>
             <td>String</td>
-            <td>The flink conf directory that will be mounted in pod. The 
flink-conf.yaml, log4j.properties, logback.xml in this path will be overwritten 
from config map.</td>
+            <td>The flink conf directory that will be mounted in pod. The 
config.yaml, log4j.properties, logback.xml in this path will be overwritten 
from config map.</td>
         </tr>
         <tr>
             <td><h5>kubernetes.flink.log.dir</h5></td>
diff --git a/docs/layouts/shortcodes/generated/python_configuration.html 
b/docs/layouts/shortcodes/generated/python_configuration.html
index 60ef6a9676e..d99c1f2a3b9 100644
--- a/docs/layouts/shortcodes/generated/python_configuration.html
+++ b/docs/layouts/shortcodes/generated/python_configuration.html
@@ -18,7 +18,7 @@
             <td><h5>python.client.executable</h5></td>
             <td style="word-wrap: break-word;">"python"</td>
             <td>String</td>
-            <td>The path of the Python interpreter used to launch the Python 
process when submitting the Python jobs via "flink run" or compiling the 
Java/Scala jobs containing Python UDFs. Equivalent to the command line option 
"-pyclientexec" or the environment variable PYFLINK_CLIENT_EXECUTABLE. The 
priority is as following: <br />1. the configuration 'python.client.executable' 
defined in the source code(Only used in Flink Java SQL/Table API job call 
Python UDF);<br />2. the command li [...]
+            <td>The path of the Python interpreter used to launch the Python 
process when submitting the Python jobs via "flink run" or compiling the 
Java/Scala jobs containing Python UDFs. Equivalent to the command line option 
"-pyclientexec" or the environment variable PYFLINK_CLIENT_EXECUTABLE. The 
priority is as following: <br />1. the configuration 'python.client.executable' 
defined in the source code(Only used in Flink Java SQL/Table API job call 
Python UDF);<br />2. the command li [...]
         </tr>
         <tr>
             <td><h5>python.executable</h5></td>
diff --git a/docs/layouts/shortcodes/generated/rocksdb_configuration.html 
b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
index 5cd25cda3f8..b6ad67234e0 100644
--- a/docs/layouts/shortcodes/generated/rocksdb_configuration.html
+++ b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
@@ -30,7 +30,7 @@
             <td><h5>state.backend.rocksdb.memory.fixed-per-tm</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>MemorySize</td>
-            <td>The fixed total amount of memory, shared among all RocksDB 
instances per Task Manager (cluster-level option). This option only takes 
effect if neither 'state.backend.rocksdb.memory.managed' nor 
'state.backend.rocksdb.memory.fixed-per-slot' are not configured. If none is 
configured then each RocksDB column family state has its own memory caches (as 
controlled by the column family options). The relevant options for the shared 
resources (e.g. write-buffer-ratio) can be set o [...]
+            <td>The fixed total amount of memory, shared among all RocksDB 
instances per Task Manager (cluster-level option). This option only takes 
effect if neither 'state.backend.rocksdb.memory.managed' nor 
'state.backend.rocksdb.memory.fixed-per-slot' are not configured. If none is 
configured then each RocksDB column family state has its own memory caches (as 
controlled by the column family options). The relevant options for the shared 
resources (e.g. write-buffer-ratio) can be set o [...]
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.memory.high-prio-pool-ratio</h5></td>
diff --git 
a/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html 
b/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html
index 2dd92d7c0e0..7d917bb7ae8 100644
--- a/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html
+++ b/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html
@@ -18,7 +18,7 @@
             <td><h5>state.backend.rocksdb.memory.fixed-per-tm</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>MemorySize</td>
-            <td>The fixed total amount of memory, shared among all RocksDB 
instances per Task Manager (cluster-level option). This option only takes 
effect if neither 'state.backend.rocksdb.memory.managed' nor 
'state.backend.rocksdb.memory.fixed-per-slot' are not configured. If none is 
configured then each RocksDB column family state has its own memory caches (as 
controlled by the column family options). The relevant options for the shared 
resources (e.g. write-buffer-ratio) can be set o [...]
+            <td>The fixed total amount of memory, shared among all RocksDB 
instances per Task Manager (cluster-level option). This option only takes 
effect if neither 'state.backend.rocksdb.memory.managed' nor 
'state.backend.rocksdb.memory.fixed-per-slot' are not configured. If none is 
configured then each RocksDB column family state has its own memory caches (as 
controlled by the column family options). The relevant options for the shared 
resources (e.g. write-buffer-ratio) can be set o [...]
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.memory.high-prio-pool-ratio</h5></td>
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
index 0cf31d5cabc..67a53405d24 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
@@ -55,7 +55,7 @@ public final class HadoopUtils {
 
     /**
      * Returns a new Hadoop Configuration object using the path to the hadoop 
conf configured in the
-     * main configuration (flink-conf.yaml). This method is public because its 
being used in the
+     * main configuration (config.yaml). This method is public because its 
being used in the
      * HadoopDataSource.
      *
      * @param flinkConfiguration Flink configuration object
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
index 191d89d957f..b099d944892 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
@@ -503,7 +503,7 @@ public class RestartStrategies {
     /**
      * Restart strategy configuration that could be used by jobs to use 
cluster level restart
      * strategy. Useful especially when one has a custom implementation of 
restart strategy set via
-     * flink-conf.yaml.
+     * config.yaml.
      */
     @PublicEvolving
     public static final class FallbackRestartStrategyConfiguration
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index ad7857af06d..b7751abd07b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -391,7 +391,7 @@ public final class ConfigConstants {
     /**
      * Prefix for passing custom environment variables to Flink's master 
process. For example for
      * passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
-     * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the 
flink-conf.yaml.
+     * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the 
config.yaml.
      *
      * @deprecated Use {@link 
ResourceManagerOptions#CONTAINERIZED_MASTER_ENV_PREFIX} instead.
      */
@@ -482,7 +482,7 @@ public final class ConfigConstants {
     /**
      * Prefix for passing custom environment variables to Flink's 
ApplicationMaster (JobManager).
      * For example for passing LD_LIBRARY_PATH as an env variable to the 
AppMaster, set:
-     * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the 
flink-conf.yaml.
+     * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the 
config.yaml.
      *
      * @deprecated Please use {@code CONTAINERIZED_MASTER_ENV_PREFIX}.
      */
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
index e846e0a114e..10283e68657 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -276,7 +276,7 @@ public class ResourceManagerOptions {
     /**
      * Prefix for passing custom environment variables to Flink's master 
process. For example for
      * passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
-     * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the 
flink-conf.yaml.
+     * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the 
config.yaml.
      */
     public static final String CONTAINERIZED_MASTER_ENV_PREFIX = 
"containerized.master.env.";
 
diff --git a/flink-dist/src/main/assemblies/bin.xml 
b/flink-dist/src/main/assemblies/bin.xml
index 9753e1db003..3ac51f4562f 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -123,7 +123,7 @@ under the License.
 
                <!-- copy the config file -->
                <file>
-                       <source>src/main/resources/flink-conf.yaml</source>
+                       <source>src/main/resources/config.yaml</source>
                        <outputDirectory>conf</outputDirectory>
                        <fileMode>0644</fileMode>
                </file>
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index d90e4362f7e..2959f9c5ece 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -112,12 +112,12 @@ readFromConfig() {
 }
 
 
########################################################################################################################
-# DEFAULT CONFIG VALUES: These values will be used when nothing has been 
specified in conf/flink-conf.yaml
+# DEFAULT CONFIG VALUES: These values will be used when nothing has been 
specified in conf/config.yaml
 # -or- the respective environment variables are not set.
 
########################################################################################################################
 
 # WARNING !!! , these values are only used if there is nothing else is 
specified in
-# conf/flink-conf.yaml
+# conf/config.yaml
 
 DEFAULT_ENV_PID_DIR="/tmp"                          # Directory to store *.pid 
files to
 DEFAULT_ENV_LOG_MAX=10                              # Maximum number of old 
log files to keep
@@ -134,7 +134,7 @@ DEFAULT_HADOOP_CONF_DIR=""                          # 
Hadoop Configuration Direc
 DEFAULT_HBASE_CONF_DIR=""                           # HBase Configuration 
Directory, if necessary
 
 
########################################################################################################################
-# CONFIG KEYS: The default values can be overwritten by the following keys in 
conf/flink-conf.yaml
+# CONFIG KEYS: The default values can be overwritten by the following keys in 
conf/config.yaml
 
########################################################################################################################
 
 KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"
@@ -351,7 +351,7 @@ if [ -z "${HIGH_AVAILABILITY}" ]; then
 fi
 
 # Arguments for the JVM. Used for job and task manager JVMs.
-# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
+# DO NOT USE FOR MEMORY SETTINGS! Use conf/config.yaml with keys
 # JobManagerOptions#TOTAL_PROCESS_MEMORY and 
TaskManagerOptions#TOTAL_PROCESS_MEMORY for that!
 if [ -z "${JVM_ARGS}" ]; then
     JVM_ARGS=""
diff --git a/flink-dist/src/main/resources/config.yaml 
b/flink-dist/src/main/resources/config.yaml
new file mode 100644
index 00000000000..08aace171ea
--- /dev/null
+++ b/flink-dist/src/main/resources/config.yaml
@@ -0,0 +1,298 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+# These parameters are required for Java 17 support.
+# They can be safely removed when using Java 8/11.
+env:
+  java:
+    opts:
+      all: --add-exports=java.base/sun.net.util=ALL-UNNAMED 
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED 
--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNA [...]
+
+#==============================================================================
+# Common
+#==============================================================================
+
+jobmanager:
+  # The host interface the JobManager will bind to. By default, this is 
localhost, and will prevent
+  # the JobManager from communicating outside the machine/container it is 
running on.
+  # On YARN this setting will be ignored if it is set to 'localhost', 
defaulting to 0.0.0.0.
+  # On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
+  #
+  # To enable this, set the bind-host address to one that has access to an 
outside facing network
+  # interface, such as 0.0.0.0.
+  bind-host: localhost
+  rpc:
+    # The external address of the host on which the JobManager runs and can be
+    # reached by the TaskManagers and any clients which want to connect. This 
setting
+    # is only used in Standalone mode and may be overwritten on the JobManager 
side
+    # by specifying the --host <hostname> parameter of the bin/jobmanager.sh 
executable.
+    # In high availability mode, if you use the bin/start-cluster.sh script 
and setup
+    # the conf/masters file, this will be taken care of automatically. Yarn
+    # automatically configure the host name based on the hostname of the node 
where the
+    # JobManager runs.
+    address: localhost
+    # The RPC port where the JobManager is reachable.
+    port: 6123
+  memory:
+    process:
+      # The total process memory size for the JobManager.
+      # Note this accounts for all memory usage within the JobManager process, 
including JVM metaspace and other overhead.
+      size: 1600m
+  execution:
+    # The failover strategy, i.e., how the job computation recovers from task 
failures.
+    # Only restart tasks that may have been affected by the task failure, 
which typically includes
+    # downstream tasks and potentially upstream tasks if their produced data 
is no longer available for consumption.
+    failover-strategy: region
+
+taskmanager:
+  # The host interface the TaskManager will bind to. By default, this is 
localhost, and will prevent
+  # the TaskManager from communicating outside the machine/container it is 
running on.
+  # On YARN this setting will be ignored if it is set to 'localhost', 
defaulting to 0.0.0.0.
+  # On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
+  #
+  # To enable this, set the bind-host address to one that has access to an 
outside facing network
+  # interface, such as 0.0.0.0.
+  bind-host: localhost
+  # The address of the host on which the TaskManager runs and can be reached 
by the JobManager and
+  # other TaskManagers. If not specified, the TaskManager will try different 
strategies to identify
+  # the address.
+  #
+  # Note this address needs to be reachable by the JobManager and forward 
traffic to one of
+  # the interfaces the TaskManager is bound to (see 'taskmanager.bind-host').
+  #
+  # Note also that unless all TaskManagers are running on the same machine, 
this address needs to be
+  # configured separately for each TaskManager.
+  host: localhost
+  # The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
+  numberOfTaskSlots: 1
+  memory:
+    process:
+      # The total process memory size for the TaskManager.
+      #
+      # Note this accounts for all memory usage within the TaskManager 
process, including JVM metaspace and other overhead.
+      # To exclude JVM metaspace and overhead, please, use total Flink memory 
size instead of 'taskmanager.memory.process.size'.
+      # It is not recommended to set both 'taskmanager.memory.process.size' 
and Flink memory.
+      size: 1728m
+
+parallelism:
+  # The parallelism used for programs that did not specify and other 
parallelism.
+  default: 1
+
+# # The default file system scheme and authority.
+# # By default file paths without scheme are interpreted relative to the local
+# # root file system 'file:///'. Use this to override the default and interpret
+# # relative paths relative to a different file system,
+# # for example 'hdfs://mynamenode:12345'
+# fs:
+#   default-scheme: hdfs://mynamenode:12345
+
+#==============================================================================
+# High Availability
+#==============================================================================
+
+# high-availability:
+#   # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
+#   type: zookeeper
+#   # The path where metadata for master recovery is persisted. While 
ZooKeeper stores
+#   # the small ground truth for checkpoint and leader election, this location 
stores
+#   # the larger objects, like persisted dataflow graphs.
+#   #
+#   # Must be a durable file system that is accessible from all nodes
+#   # (like HDFS, S3, Ceph, nfs, ...)
+#   storageDir: hdfs:///flink/ha/
+#   zookeeper:
+#     # The list of ZooKeeper quorum peers that coordinate the 
high-availability
+#     # setup. This must be a list of the form:
+#     # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
+#     quorum: localhost:2181
+#     client:
+#       # ACL options are based on 
https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
+#       # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" 
(ZOO_OPEN_ACL_UNSAFE)
+#       # The default value is "open" and it can be changed to "creator" if ZK 
security is enabled
+#       acl: open
+
+#==============================================================================
+# Fault tolerance and checkpointing
+#==============================================================================
+
+# The backend that will be used to store operator state checkpoints if
+# checkpointing is enabled. Checkpointing is enabled when 
execution.checkpointing.interval > 0.
+
+# # Execution checkpointing related parameters. Please refer to 
CheckpointConfig and ExecutionCheckpointingOptions for more details.
+# execution:
+#   checkpointing:
+#     interval: 3min
+#     externalized-checkpoint-retention: [DELETE_ON_CANCELLATION, 
RETAIN_ON_CANCELLATION]
+#     max-concurrent-checkpoints: 1
+#     min-pause: 0
+#     mode: [EXACTLY_ONCE, AT_LEAST_ONCE]
+#     timeout: 10min
+#     tolerable-failed-checkpoints: 0
+#     unaligned: false
+
+# state:
+#   backend:
+#     # Supported backends are 'hashmap', 'rocksdb', or the
+#     # <class-name-of-factory>.
+#     type: hashmap
+#     # Flag to enable/disable incremental checkpoints for backends that
+#     # support incremental checkpoints (like the RocksDB state backend).
+#     incremental: false
+#   checkpoints:
+#       # Directory for checkpoints filesystem, when using any of the default 
bundled
+#       # state backends.
+#       dir: hdfs://namenode-host:port/flink-checkpoints
+#   savepoints:
+#       # Default target directory for savepoints, optional.
+#       dir: hdfs://namenode-host:port/flink-savepoints
+
+#==============================================================================
+# Rest & web frontend
+#==============================================================================
+
+rest:
+  # The address to which the REST client will connect to
+  address: localhost
+  # The address that the REST & web server binds to
+  # By default, this is localhost, which prevents the REST & web server from
+  # being able to communicate outside of the machine/container it is running 
on.
+  #
+  # To enable this, set the bind address to one that has access to 
outside-facing
+  # network interface, such as 0.0.0.0.
+  bind-address: localhost
+  # # The port to which the REST client connects to. If rest.bind-port has
+  # # not been specified, then the server will bind to this port as well.
+  # port: 8081
+  # # Port range for the REST and web server to bind to.
+  # bind-port: 8080-8090
+
+# web:
+#   submit:
+#     # Flag to specify whether job submission is enabled from the web-based
+#     # runtime monitor. Uncomment to disable.
+#     enable: false
+#   cancel:
+#     # Flag to specify whether job cancellation is enabled from the web-based
+#     # runtime monitor. Uncomment to disable.
+#     enable: false
+
+#==============================================================================
+# Advanced
+#==============================================================================
+
+# io:
+#   tmp:
+#     # Override the directories for temporary files. If not specified, the
+#     # system-specific Java temporary directory (java.io.tmpdir property) is 
taken.
+#     #
+#     # For framework setups on Yarn, Flink will automatically pick up the
+#     # containers' temp directories without any need for configuration.
+#     #
+#     # Add a delimited list for multiple directories, using the system 
directory
+#     # delimiter (colon ':' on unix) or a comma, e.g.:
+#     # /data1/tmp:/data2/tmp:/data3/tmp
+#     #
+#     # Note: Each directory entry is read from and written to by a different 
I/O
+#     # thread. You can include the same directory multiple times in order to 
create
+#     # multiple I/O threads against that directory. This is for example 
relevant for
+#     # high-throughput RAIDs.
+#     dirs: /tmp
+
+# classloader:
+#   resolve:
+#     # The classloading resolve order. Possible values are 'child-first' 
(Flink's default)
+#     # and 'parent-first' (Java's default).
+#     #
+#     # Child first classloading allows users to use different 
dependency/library
+#     # versions in their application than those in the classpath. Switching 
back
+#     # to 'parent-first' may help with debugging dependency issues.
+#     order: child-first
+
+# The amount of memory going to the network stack. These numbers usually need
+# no tuning. Adjusting them may be necessary in case of an "Insufficient number
+# of network buffers" error. The default min is 64MB, the default max is 1GB.
+#
+# taskmanager:
+#   memory:
+#     network:
+#       fraction: 0.1
+#       min: 64mb
+#       max: 1gb
+
+#==============================================================================
+# Flink Cluster Security Configuration
+#==============================================================================
+
+# Kerberos authentication for various components - Hadoop, ZooKeeper, and 
connectors -
+# may be enabled in four steps:
+# 1. configure the local krb5.conf file
+# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
+# 3. make the credentials available to various JAAS login contexts
+# 4. configure the connector to use JAAS/SASL
+
+# # The below configure how Kerberos credentials are provided. A keytab will 
be used instead of
+# # a ticket cache if the keytab path and principal are set.
+# security:
+#   kerberos:
+#     login:
+#       use-ticket-cache: true
+#       keytab: /path/to/kerberos/keytab
+#       principal: flink-user
+#       # The configuration below defines which JAAS login contexts
+#       contexts: Client,KafkaClient
+
+#==============================================================================
+# ZK Security Configuration
+#==============================================================================
+
+# zookeeper:
+#   sasl:
+#     # Below configurations are applicable if ZK ensemble is configured for 
security
+#     #
+#     # Override below configuration to provide custom ZK service name if 
configured
+#     # zookeeper.sasl.service-name: zookeeper
+#     #
+#     # The configuration below must match one of the values set in 
"security.kerberos.login.contexts"
+#     login-context-name: Client
+
+#==============================================================================
+# HistoryServer
+#==============================================================================
+
+# The HistoryServer is started and stopped via bin/historyserver.sh 
(start|stop)
+#
+# jobmanager:
+#   archive:
+#     fs:
+#       # Directory to upload completed jobs to. Add this directory to the 
list of
+#       # monitored directories of the HistoryServer as well (see below).
+#       dir: hdfs:///completed-jobs/
+
+# historyserver:
+#   web:
+#     # The address under which the web-based HistoryServer listens.
+#     address: 0.0.0.0
+#     # The port under which the web-based HistoryServer listens.
+#     port: 8082
+#   archive:
+#     fs:
+#       # Comma separated list of directories to monitor for completed jobs.
+#       dir: hdfs:///completed-jobs/
+#       # Interval in milliseconds for refreshing the monitored directories.
+#       fs.refresh-interval: 10000
+
diff --git a/flink-dist/src/main/resources/flink-conf.yaml 
b/flink-dist/src/main/resources/flink-conf.yaml
deleted file mode 100644
index b5aa2794dd9..00000000000
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ /dev/null
@@ -1,311 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-################################################################################
-
-# These parameters are required for Java 17 support.
-# They can be safely removed when using Java 8/11.
-env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED 
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED 
--add-exports=java.security.jgss/sun.security.krb5= [...]
-
-#==============================================================================
-# Common
-#==============================================================================
-
-# The external address of the host on which the JobManager runs and can be
-# reached by the TaskManagers and any clients which want to connect. This 
setting
-# is only used in Standalone mode and may be overwritten on the JobManager side
-# by specifying the --host <hostname> parameter of the bin/jobmanager.sh 
executable.
-# In high availability mode, if you use the bin/start-cluster.sh script and 
setup
-# the conf/masters file, this will be taken care of automatically. Yarn
-# automatically configure the host name based on the hostname of the node 
where the
-# JobManager runs.
-
-jobmanager.rpc.address: localhost
-
-# The RPC port where the JobManager is reachable.
-
-jobmanager.rpc.port: 6123
-
-# The host interface the JobManager will bind to. By default, this is 
localhost, and will prevent
-# the JobManager from communicating outside the machine/container it is 
running on.
-# On YARN this setting will be ignored if it is set to 'localhost', defaulting 
to 0.0.0.0.
-# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
-#
-# To enable this, set the bind-host address to one that has access to an 
outside facing network
-# interface, such as 0.0.0.0.
-
-jobmanager.bind-host: localhost
-
-
-# The total process memory size for the JobManager.
-#
-# Note this accounts for all memory usage within the JobManager process, 
including JVM metaspace and other overhead.
-
-jobmanager.memory.process.size: 1600m
-
-# The host interface the TaskManager will bind to. By default, this is 
localhost, and will prevent
-# the TaskManager from communicating outside the machine/container it is 
running on.
-# On YARN this setting will be ignored if it is set to 'localhost', defaulting 
to 0.0.0.0.
-# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
-#
-# To enable this, set the bind-host address to one that has access to an 
outside facing network
-# interface, such as 0.0.0.0.
-
-taskmanager.bind-host: localhost
-
-# The address of the host on which the TaskManager runs and can be reached by 
the JobManager and
-# other TaskManagers. If not specified, the TaskManager will try different 
strategies to identify
-# the address.
-#
-# Note this address needs to be reachable by the JobManager and forward 
traffic to one of
-# the interfaces the TaskManager is bound to (see 'taskmanager.bind-host').
-#
-# Note also that unless all TaskManagers are running on the same machine, this 
address needs to be
-# configured separately for each TaskManager.
-
-taskmanager.host: localhost
-
-# The total process memory size for the TaskManager.
-#
-# Note this accounts for all memory usage within the TaskManager process, 
including JVM metaspace and other overhead.
-
-taskmanager.memory.process.size: 1728m
-
-# To exclude JVM metaspace and overhead, please, use total Flink memory size 
instead of 'taskmanager.memory.process.size'.
-# It is not recommended to set both 'taskmanager.memory.process.size' and 
Flink memory.
-#
-# taskmanager.memory.flink.size: 1280m
-
-# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
-
-taskmanager.numberOfTaskSlots: 1
-
-# The parallelism used for programs that did not specify and other parallelism.
-
-parallelism.default: 1
-
-# The default file system scheme and authority.
-# 
-# By default file paths without scheme are interpreted relative to the local
-# root file system 'file:///'. Use this to override the default and interpret
-# relative paths relative to a different file system,
-# for example 'hdfs://mynamenode:12345'
-#
-# fs.default-scheme
-
-#==============================================================================
-# High Availability
-#==============================================================================
-
-# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
-#
-# high-availability.type: zookeeper
-
-# The path where metadata for master recovery is persisted. While ZooKeeper 
stores
-# the small ground truth for checkpoint and leader election, this location 
stores
-# the larger objects, like persisted dataflow graphs.
-# 
-# Must be a durable file system that is accessible from all nodes
-# (like HDFS, S3, Ceph, nfs, ...) 
-#
-# high-availability.storageDir: hdfs:///flink/ha/
-
-# The list of ZooKeeper quorum peers that coordinate the high-availability
-# setup. This must be a list of the form:
-# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
-#
-# high-availability.zookeeper.quorum: localhost:2181
-
-
-# ACL options are based on 
https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
-# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" 
(ZOO_OPEN_ACL_UNSAFE)
-# The default value is "open" and it can be changed to "creator" if ZK 
security is enabled
-#
-# high-availability.zookeeper.client.acl: open
-
-#==============================================================================
-# Fault tolerance and checkpointing
-#==============================================================================
-
-# The backend that will be used to store operator state checkpoints if
-# checkpointing is enabled. Checkpointing is enabled when 
execution.checkpointing.interval > 0.
-#
-# Execution checkpointing related parameters. Please refer to CheckpointConfig 
and ExecutionCheckpointingOptions for more details.
-#
-# execution.checkpointing.interval: 3min
-# execution.checkpointing.externalized-checkpoint-retention: 
[DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]
-# execution.checkpointing.max-concurrent-checkpoints: 1
-# execution.checkpointing.min-pause: 0
-# execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE]
-# execution.checkpointing.timeout: 10min
-# execution.checkpointing.tolerable-failed-checkpoints: 0
-# execution.checkpointing.unaligned: false
-#
-# Supported backends are 'hashmap', 'rocksdb', or the
-# <class-name-of-factory>.
-#
-# state.backend.type: hashmap
-
-# Directory for checkpoints filesystem, when using any of the default bundled
-# state backends.
-#
-# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
-
-# Default target directory for savepoints, optional.
-#
-# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
-
-# Flag to enable/disable incremental checkpoints for backends that
-# support incremental checkpoints (like the RocksDB state backend). 
-#
-# state.backend.incremental: false
-
-# The failover strategy, i.e., how the job computation recovers from task 
failures.
-# Only restart tasks that may have been affected by the task failure, which 
typically includes
-# downstream tasks and potentially upstream tasks if their produced data is no 
longer available for consumption.
-
-jobmanager.execution.failover-strategy: region
-
-#==============================================================================
-# Rest & web frontend
-#==============================================================================
-
-# The port to which the REST client connects to. If rest.bind-port has
-# not been specified, then the server will bind to this port as well.
-#
-#rest.port: 8081
-
-# The address to which the REST client will connect to
-#
-rest.address: localhost
-
-# Port range for the REST and web server to bind to.
-#
-#rest.bind-port: 8080-8090
-
-# The address that the REST & web server binds to
-# By default, this is localhost, which prevents the REST & web server from
-# being able to communicate outside of the machine/container it is running on.
-#
-# To enable this, set the bind address to one that has access to outside-facing
-# network interface, such as 0.0.0.0.
-#
-rest.bind-address: localhost
-
-# Flag to specify whether job submission is enabled from the web-based
-# runtime monitor. Uncomment to disable.
-
-#web.submit.enable: false
-
-# Flag to specify whether job cancellation is enabled from the web-based
-# runtime monitor. Uncomment to disable.
-
-#web.cancel.enable: false
-
-#==============================================================================
-# Advanced
-#==============================================================================
-
-# Override the directories for temporary files. If not specified, the
-# system-specific Java temporary directory (java.io.tmpdir property) is taken.
-#
-# For framework setups on Yarn, Flink will automatically pick up the
-# containers' temp directories without any need for configuration.
-#
-# Add a delimited list for multiple directories, using the system directory
-# delimiter (colon ':' on unix) or a comma, e.g.:
-#     /data1/tmp:/data2/tmp:/data3/tmp
-#
-# Note: Each directory entry is read from and written to by a different I/O
-# thread. You can include the same directory multiple times in order to create
-# multiple I/O threads against that directory. This is for example relevant for
-# high-throughput RAIDs.
-#
-# io.tmp.dirs: /tmp
-
-# The classloading resolve order. Possible values are 'child-first' (Flink's 
default)
-# and 'parent-first' (Java's default).
-#
-# Child first classloading allows users to use different dependency/library
-# versions in their application than those in the classpath. Switching back
-# to 'parent-first' may help with debugging dependency issues.
-#
-# classloader.resolve-order: child-first
-
-# The amount of memory going to the network stack. These numbers usually need 
-# no tuning. Adjusting them may be necessary in case of an "Insufficient number
-# of network buffers" error. The default min is 64MB, the default max is 1GB.
-# 
-# taskmanager.memory.network.fraction: 0.1
-# taskmanager.memory.network.min: 64mb
-# taskmanager.memory.network.max: 1gb
-
-#==============================================================================
-# Flink Cluster Security Configuration
-#==============================================================================
-
-# Kerberos authentication for various components - Hadoop, ZooKeeper, and 
connectors -
-# may be enabled in four steps:
-# 1. configure the local krb5.conf file
-# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
-# 3. make the credentials available to various JAAS login contexts
-# 4. configure the connector to use JAAS/SASL
-
-# The below configure how Kerberos credentials are provided. A keytab will be 
used instead of
-# a ticket cache if the keytab path and principal are set.
-
-# security.kerberos.login.use-ticket-cache: true
-# security.kerberos.login.keytab: /path/to/kerberos/keytab
-# security.kerberos.login.principal: flink-user
-
-# The configuration below defines which JAAS login contexts
-
-# security.kerberos.login.contexts: Client,KafkaClient
-
-#==============================================================================
-# ZK Security Configuration
-#==============================================================================
-
-# Below configurations are applicable if ZK ensemble is configured for security
-
-# Override below configuration to provide custom ZK service name if configured
-# zookeeper.sasl.service-name: zookeeper
-
-# The configuration below must match one of the values set in 
"security.kerberos.login.contexts"
-# zookeeper.sasl.login-context-name: Client
-
-#==============================================================================
-# HistoryServer
-#==============================================================================
-
-# The HistoryServer is started and stopped via bin/historyserver.sh 
(start|stop)
-
-# Directory to upload completed jobs to. Add this directory to the list of
-# monitored directories of the HistoryServer as well (see below).
-#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
-
-# The address under which the web-based HistoryServer listens.
-#historyserver.web.address: 0.0.0.0
-
-# The port under which the web-based HistoryServer listens.
-#historyserver.web.port: 8082
-
-# Comma separated list of directories to monitor for completed jobs.
-#historyserver.archive.fs.dir: hdfs:///completed-jobs/
-
-# Interval in milliseconds for refreshing the monitored directories.
-#historyserver.archive.fs.refresh-interval: 10000
-
diff --git 
a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java 
b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java
index 744ae1dccdd..df86e666a20 100644
--- a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java
+++ b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java
@@ -18,15 +18,24 @@
 
 package org.apache.flink.dist;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.util.bash.BashJavaUtils;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 
 import org.apache.flink.shaded.guava32.com.google.common.collect.Sets;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -49,6 +58,8 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
     private static final String RUN_EXTRACT_LOGGING_OUTPUTS_SCRIPT =
             "src/test/bin/runExtractLoggingOutputs.sh";
 
+    @TempDir private Path tmpDir;
+
     @Test
     void testGetTmResourceParamsConfigs() throws Exception {
         int expectedResultLines = 2;
@@ -122,7 +133,7 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
 
     @Test
     void testGetConfiguration() throws Exception {
-        int expectedResultLines = 13;
+        int expectedResultLines = 26;
         String[] commands = {
             RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
             
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
@@ -135,7 +146,7 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
 
     @Test
     void testGetConfigurationRemoveKey() throws Exception {
-        int expectedResultLines = 12;
+        int expectedResultLines = 24;
         String[] commands = {
             RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
             
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
@@ -146,12 +157,13 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
         List<String> lines = 
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
 
         assertThat(lines).hasSize(expectedResultLines);
-        assertThat(lines).doesNotContain("parallelism.default: 1");
+        Configuration configuration = loadConfiguration(lines);
+        
assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).isEmpty();
     }
 
     @Test
     void testGetConfigurationRemoveKeyValue() throws Exception {
-        int expectedResultLines = 12;
+        int expectedResultLines = 24;
         String[] commands = {
             RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
             
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
@@ -162,12 +174,13 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
         List<String> lines = 
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
 
         assertThat(lines).hasSize(expectedResultLines);
-        assertThat(lines).doesNotContain("parallelism.default: 1");
+        Configuration configuration = loadConfiguration(lines);
+        
assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).isEmpty();
     }
 
     @Test
     void testGetConfigurationRemoveKeyValueNotMatchingValue() throws Exception 
{
-        int expectedResultLines = 13;
+        int expectedResultLines = 26;
         String[] commands = {
             RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
             
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
@@ -178,12 +191,13 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
         List<String> lines = 
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
 
         assertThat(lines).hasSize(expectedResultLines);
-        assertThat(lines).contains("parallelism.default: 1");
+        Configuration configuration = loadConfiguration(lines);
+        
assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).hasValue(1);
     }
 
     @Test
     void testGetConfigurationReplaceKeyValue() throws Exception {
-        int expectedResultLines = 13;
+        int expectedResultLines = 26;
         String[] commands = {
             RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
             
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
@@ -194,13 +208,13 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
         List<String> lines = 
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
 
         assertThat(lines).hasSize(expectedResultLines);
-        assertThat(lines).doesNotContain("parallelism.default: 1");
-        assertThat(lines).contains("parallelism.default: 2");
+        Configuration configuration = loadConfiguration(lines);
+        
assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).hasValue(2);
     }
 
     @Test
     void testGetConfigurationReplaceKeyValueNotMatchingValue() throws 
Exception {
-        int expectedResultLines = 13;
+        int expectedResultLines = 26;
         String[] commands = {
             RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
             
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
@@ -211,7 +225,8 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
         List<String> lines = 
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
 
         assertThat(lines).hasSize(expectedResultLines);
-        assertThat(lines).doesNotContain("parallelism.default: 3");
+        Configuration configuration = loadConfiguration(lines);
+        
assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).hasValue(1);
     }
 
     private static Map<String, String> parseAndAssertDynamicParameters(
@@ -274,4 +289,16 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
 
         assertThat(actualOutput).isEqualTo(expectedOutput);
     }
+
+    private Configuration loadConfiguration(List<String> lines) throws 
IOException {
+        File file =
+                TempDirUtils.newFile(
+                        tmpDir.toAbsolutePath(), 
GlobalConfiguration.FLINK_CONF_FILENAME);
+        try (final PrintWriter pw = new PrintWriter(file)) {
+            for (String line : lines) {
+                pw.println(line);
+            }
+        }
+        return 
GlobalConfiguration.loadConfiguration(tmpDir.toAbsolutePath().toString());
+    }
 }
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index 5969f0850d9..92c9986e394 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -418,11 +418,9 @@ public final class FlinkDistribution {
         mergedConfig.addAll(defaultConfig);
         mergedConfig.addAll(config);
 
-        // NOTE: Before we change the default conf file in the flink-dist to 
'config.yaml', we
-        // need to use the legacy flink conf file 'flink-conf.yaml' here.
         Files.write(
-                conf.resolve(GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME),
-                ConfigurationUtils.convertConfigToWritableLines(mergedConfig, 
true));
+                conf.resolve(GlobalConfiguration.FLINK_CONF_FILENAME),
+                ConfigurationUtils.convertConfigToWritableLines(mergedConfig, 
false));
     }
 
     public void setTaskExecutorHosts(Collection<String> taskExecutorHosts) 
throws IOException {
diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
old mode 100644
new mode 100755
index a7c4ce5a68a..d3452b4b1bb
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -50,6 +50,16 @@ TEST_INFRA_DIR=`pwd -P`
 cd $TEST_ROOT
 
 source "${TEST_INFRA_DIR}/common_utils.sh"
+source "${FLINK_DIR}/bin/bash-java-utils.sh"
+
+if [[ -z "${FLINK_CONF_DIR:-}" ]]; then
+    FLINK_CONF_DIR="$FLINK_DIR/conf"
+fi
+FLINK_CONF=${FLINK_CONF_DIR}/config.yaml
+# Flatten the configuration file config.yaml to enable end-to-end test cases 
which will modify 
+# it directly through shell scripts.
+output=$(updateAndGetFlinkConfiguration "${FLINK_CONF_DIR}" "${FLINK_DIR}/bin" 
"${FLINK_DIR}/lib" -flatten)
+echo "$output" > $FLINK_CONF
 
 NODENAME=${NODENAME:-"localhost"}
 
@@ -143,14 +153,14 @@ function swap_planner_scala_with_planner_loader() {
 
 function delete_config_key() {
     local config_key=$1
-    sed -i -e "/^${config_key}: /d" ${FLINK_DIR}/conf/flink-conf.yaml
+    sed -i -e "/^${config_key}: /d" $FLINK_CONF
 }
 
 function set_config_key() {
     local config_key=$1
     local value=$2
     delete_config_key ${config_key}
-    echo "$config_key: $value" >> $FLINK_DIR/conf/flink-conf.yaml
+    echo "$config_key: $value" >> $FLINK_CONF
 }
 
 function create_ha_config() {
@@ -166,7 +176,7 @@ function create_ha_config() {
     # This must have all the masters to be used in HA.
     echo "localhost:8081" > ${FLINK_DIR}/conf/masters
 
-    # then move on to create the flink-conf.yaml
+    # then move on to create the config.yaml
     
#==============================================================================
     # Common
     
#==============================================================================
@@ -688,7 +698,7 @@ function setup_flink_slf4j_metric_reporter() {
   METRIC_NAME_PATTERN="${1:-"*"}"
   set_config_key "metrics.reporter.slf4j.factory.class" 
"org.apache.flink.metrics.slf4j.Slf4jReporterFactory"
   set_config_key "metrics.reporter.slf4j.interval" "1 SECONDS"
-  set_config_key "metrics.reporter.slf4j.filter.includes" 
"*:${METRIC_NAME_PATTERN}"
+  set_config_key "metrics.reporter.slf4j.filter.includes" 
"'*:${METRIC_NAME_PATTERN}'"
 }
 
 function get_job_exceptions {
diff --git a/flink-end-to-end-tests/test-scripts/common_yarn_docker.sh 
b/flink-end-to-end-tests/test-scripts/common_yarn_docker.sh
index 97fcca09d5a..ec582c05f88 100755
--- a/flink-end-to-end-tests/test-scripts/common_yarn_docker.sh
+++ b/flink-end-to-end-tests/test-scripts/common_yarn_docker.sh
@@ -131,10 +131,10 @@ security.kerberos.login.principal: hadoop-user
 slot.request.timeout: 120000
 END
 )
-    docker exec master bash -c "echo \"${FLINK_CONFIG}\" > 
/home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml"
+    docker exec master bash -c "echo \"${FLINK_CONFIG}\" > 
/home/hadoop-user/${FLINK_DIRNAME}/conf/config.yaml"
 
     echo "Flink config:"
-    docker exec master bash -c "cat 
/home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml"
+    docker exec master bash -c "cat 
/home/hadoop-user/${FLINK_DIRNAME}/conf/config.yaml"
 }
 
 function debug_copy_and_show_logs {
diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink.sh 
b/flink-end-to-end-tests/test-scripts/test_pyflink.sh
index b9134ebcb4a..a078e84da6b 100755
--- a/flink-end-to-end-tests/test-scripts/test_pyflink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_pyflink.sh
@@ -67,9 +67,13 @@ on_exit test_clean_up
 
 cp -r "${FLINK_DIR}/conf" "${TEST_DATA_DIR}/conf"
 
-echo "taskmanager.memory.task.off-heap.size: 768m" >> 
"${TEST_DATA_DIR}/conf/flink-conf.yaml"
-echo "taskmanager.memory.process.size: 3172m" >> 
"${TEST_DATA_DIR}/conf/flink-conf.yaml"
-echo "taskmanager.numberOfTaskSlots: 5" >> 
"${TEST_DATA_DIR}/conf/flink-conf.yaml"
+# standard yaml do not allow duplicate keys
+sed -i -e "/^taskmanager.memory.task.off-heap.size: /d" 
"${TEST_DATA_DIR}/conf/config.yaml"
+sed -i -e "/^taskmanager.memory.process.size: /d" 
"${TEST_DATA_DIR}/conf/config.yaml"
+sed -i -e "/^taskmanager.numberOfTaskSlots: /d" 
"${TEST_DATA_DIR}/conf/config.yaml"
+echo "taskmanager.memory.task.off-heap.size: 768m" >> 
"${TEST_DATA_DIR}/conf/config.yaml"
+echo "taskmanager.memory.process.size: 3172m" >> 
"${TEST_DATA_DIR}/conf/config.yaml"
+echo "taskmanager.numberOfTaskSlots: 5" >> "${TEST_DATA_DIR}/conf/config.yaml"
 export FLINK_CONF_DIR="${TEST_DATA_DIR}/conf"
 
 FLINK_PYTHON_DIR=`cd "${CURRENT_DIR}/../../flink-python" && pwd -P`
diff --git 
a/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh 
b/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh
index 1274a38851c..bb9e456a06d 100755
--- 
a/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh
+++ 
b/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh
@@ -81,7 +81,7 @@ if [[ ! "${YARN_APPLICATION_LOGS}" =~ "Receive initial 
delegation tokens from re
 fi
 
 echo "Running Job without configured keytab, the exception you see below is 
expected"
-docker exec master bash -c "echo \"\" > 
/home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml"
+docker exec master bash -c "echo \"\" > 
/home/hadoop-user/${FLINK_DIRNAME}/conf/config.yaml"
 # verify that it doesn't work if we don't configure a keytab
 docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \
     /home/hadoop-user/${FLINK_DIRNAME}/bin/flink run-application \
diff --git 
a/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh 
b/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh
index f80e0635224..37e839103a9 100755
--- a/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh
+++ b/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh
@@ -66,7 +66,7 @@ else
 fi
 
 echo "Running Job without configured keytab, the exception you see below is 
expected"
-docker exec master bash -c "echo \"\" > 
/home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml"
+docker exec master bash -c "echo \"\" > 
/home/hadoop-user/${FLINK_DIRNAME}/conf/config.yaml"
 # verify that it doesn't work if we don't configure a keytab
 docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \
     /home/hadoop-user/${FLINK_DIRNAME}/bin/flink run \
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
index 31e89dacac4..badd86072d8 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
@@ -112,7 +112,7 @@ public abstract class ParquetVectorizedInputFormat<T, 
SplitT extends FileSourceS
         final long splitLength = split.length();
 
         // Using Flink FileSystem instead of Hadoop FileSystem directly, so we 
can get the hadoop
-        // config that create inputFile needed from flink-conf.yaml
+        // config that create inputFile needed from config.yaml
         final FileSystem fs = filePath.getFileSystem();
         final ParquetInputFile inputFile =
                 new ParquetInputFile(fs.open(filePath), 
fs.getFileStatus(filePath).getLen());
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index d67893e4d21..3a12968663e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -1564,7 +1564,7 @@ public abstract class DataSet<T> {
      * dataset.writeAsText("file:///path1"); }</pre>
      *   <li>A directory is always created when <a
      *       
href="https://nightlies.apache.org/flink/flink-docs-master/setup/config.html#file-systems";>fs.output.always-create-directory</a>
-     *       is set to true in flink-conf.yaml file, even when parallelism is 
set to 1.
+     *       is set to true in config.yaml file, even when parallelism is set 
to 1.
      *       <pre>{@code .
      * └── path1/
      *     └── 1 }</pre>
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
index 27382e41be4..8165ed2e5d2 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
@@ -301,7 +301,7 @@ public class KubernetesConfigOptions {
                     .stringType()
                     .defaultValue("/opt/flink/conf")
                     .withDescription(
-                            "The flink conf directory that will be mounted in 
pod. The flink-conf.yaml, log4j.properties, "
+                            "The flink conf directory that will be mounted in 
pod. The config.yaml, log4j.properties, "
                                     + "logback.xml in this path will be 
overwritten from config map.");
 
     public static final ConfigOption<String> FLINK_LOG_DIR =
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
index 2203770e267..3c3ee9a5154 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
@@ -64,7 +64,7 @@ import static 
org.apache.flink.kubernetes.utils.Constants.FLINK_CONF_VOLUME;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Mounts the log4j.properties, logback.xml, and flink-conf.yaml configuration 
on the JobManager or
+ * Mounts the log4j.properties, logback.xml, and config.yaml configuration on 
the JobManager or
  * TaskManager pod.
  */
 public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator {
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
index c6b335449a8..21afbefc287 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
@@ -78,7 +78,7 @@ public interface KubernetesParameters {
      */
     List<Map<String, String>> getTolerations();
 
-    /** Directory in Pod that stores the flink-conf.yaml, log4j.properties, 
and the logback.xml. */
+    /** Directory in Pod that stores the config.yaml, log4j.properties, and 
the logback.xml. */
     String getFlinkConfDirInPod();
 
     /** Directory in Pod that saves the log files. */
diff --git a/flink-python/dev/dev-requirements.txt 
b/flink-python/dev/dev-requirements.txt
index 2d74afd58c2..f692a63f0f8 100755
--- a/flink-python/dev/dev-requirements.txt
+++ b/flink-python/dev/dev-requirements.txt
@@ -32,3 +32,4 @@ pemja==0.4.1; platform_system != 'Windows'
 httplib2>=0.19.0
 protobuf>=3.19.0
 pytest~=7.0
+pyyaml>=6.0.1
diff --git a/flink-python/pyflink/common/configuration.py 
b/flink-python/pyflink/common/configuration.py
index 54e82962f43..35416b8fa47 100644
--- a/flink-python/pyflink/common/configuration.py
+++ b/flink-python/pyflink/common/configuration.py
@@ -69,10 +69,21 @@ class Configuration:
         jars_key = 
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
         classpaths_key = 
jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
         if key in [jars_key, classpaths_key]:
-            add_jars_to_context_class_loader(value.split(";"))
+            jar_urls = Configuration.parse_jars_value(value, jvm)
+            add_jars_to_context_class_loader(jar_urls)
         self._j_configuration.setString(key, value)
         return self
 
+    @staticmethod
+    def parse_jars_value(value: str, jvm):
+        is_standard_yaml = 
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
+        if is_standard_yaml:
+            import yaml
+            jar_urls_list = yaml.safe_load(value)
+            if isinstance(jar_urls_list, list):
+                return jar_urls_list
+        return value.split(";")
+
     def get_integer(self, key: str, default_value: int) -> int:
         """
         Returns the value associated with the given key as an integer.
diff --git a/flink-python/pyflink/common/restart_strategy.py 
b/flink-python/pyflink/common/restart_strategy.py
index b1c4621bf3b..52962e138e6 100644
--- a/flink-python/pyflink/common/restart_strategy.py
+++ b/flink-python/pyflink/common/restart_strategy.py
@@ -152,7 +152,7 @@ class RestartStrategies(object):
         """
         Restart strategy configuration that could be used by jobs to use 
cluster level restart
         strategy. Useful especially when one has a custom implementation of 
restart strategy set via
-        flink-conf.yaml.
+        config.yaml.
         """
 
         def __init__(self, j_restart_strategy=None):
diff --git a/flink-python/pyflink/datastream/state_backend.py 
b/flink-python/pyflink/datastream/state_backend.py
index 6ffe21912be..7b4c69917bb 100644
--- a/flink-python/pyflink/datastream/state_backend.py
+++ b/flink-python/pyflink/datastream/state_backend.py
@@ -280,7 +280,7 @@ class EmbeddedRocksDBStateBackend(StateBackend):
         Sets the predefined options for RocksDB.
 
         If user-configured options within ``RocksDBConfigurableOptions`` is 
set (through
-        flink-conf.yaml) or a user-defined options factory is set (via 
:func:`setOptions`),
+        config.yaml) or a user-defined options factory is set (via 
:func:`setOptions`),
         then the options from the factory are applied on top of the here 
specified
         predefined options and customized options.
 
@@ -301,7 +301,7 @@ class EmbeddedRocksDBStateBackend(StateBackend):
         are :data:`PredefinedOptions.DEFAULT`.
 
         If user-configured options within ``RocksDBConfigurableOptions`` is 
set (through
-        flink-conf.yaml) or a user-defined options factory is set (via 
:func:`setOptions`),
+        config.yaml) or a user-defined options factory is set (via 
:func:`setOptions`),
         then the options from the factory are applied on top of the predefined 
and customized
         options.
 
@@ -320,7 +320,7 @@ class EmbeddedRocksDBStateBackend(StateBackend):
 
         The options created by the factory here are applied on top of the 
pre-defined
         options profile selected via :func:`set_predefined_options`  and 
user-configured
-        options from configuration set through flink-conf.yaml with keys in
+        options from configuration set through config.yaml with keys in
         ``RocksDBConfigurableOptions``.
 
         :param options_factory_class_name: The fully-qualified class name of 
the options
@@ -383,7 +383,7 @@ class MemoryStateBackend(StateBackend):
         >> env.set_state_backend(HashMapStateBackend())
         >> 
env.get_checkpoint_config().set_checkpoint_storage(JobManagerCheckpointStorage())
 
-    If you are configuring your state backend via the `flink-conf.yaml` please 
make the following
+    If you are configuring your state backend via the `config.yaml` please 
make the following
     changes.
 
     ```
@@ -535,7 +535,7 @@ class FsStateBackend(StateBackend):
         >> env.set_state_backend(HashMapStateBackend())
         >> 
env.get_checkpoint_config().set_checkpoint_storage("hdfs://checkpoints")
 
-    If you are configuring your state backend via the `flink-conf.yaml` please 
set your state
+    If you are configuring your state backend via the `config.yaml` please set 
your state
     backend type to `hashmap`.
 
     This state backend holds the working state in the memory (JVM heap) of the 
TaskManagers.
@@ -717,7 +717,7 @@ class RocksDBStateBackend(StateBackend):
         >> env.set_state_backend(EmbeddedRocksDBStateBackend())
         >> 
env.get_checkpoint_config().set_checkpoint_storage("hdfs://checkpoints")
 
-    If you are configuring your state backend via the `flink-conf.yaml` no 
changes are required.
+    If you are configuring your state backend via the `config.yaml` no changes 
are required.
 
     A State Backend that stores its state in ``RocksDB``. This state backend 
can
     store very large state that exceeds memory and spills to disk.
@@ -862,7 +862,7 @@ class RocksDBStateBackend(StateBackend):
         Sets the predefined options for RocksDB.
 
         If user-configured options within ``RocksDBConfigurableOptions`` is 
set (through
-        flink-conf.yaml) or a user-defined options factory is set (via 
:func:`setOptions`),
+        config.yaml) or a user-defined options factory is set (via 
:func:`setOptions`),
         then the options from the factory are applied on top of the here 
specified
         predefined options and customized options.
 
@@ -882,7 +882,7 @@ class RocksDBStateBackend(StateBackend):
         are :data:`PredefinedOptions.DEFAULT`.
 
         If user-configured options within ``RocksDBConfigurableOptions`` is 
set (through
-        flink-conf.yaml) or a user-defined options factory is set (via 
:func:`setOptions`),
+        config.yaml) or a user-defined options factory is set (via 
:func:`setOptions`),
         then the options from the factory are applied on top of the predefined 
and customized
         options.
 
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py 
b/flink-python/pyflink/datastream/stream_execution_environment.py
index d1b7ce41118..20fcf85d7ed 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -842,7 +842,7 @@ class StreamExecutionEnvironment(object):
         method returns a local execution environment.
 
         When executed from the command line the given configuration is stacked 
on top of the
-        global configuration which comes from the flink-conf.yaml, potentially 
overriding
+        global configuration which comes from the config.yaml, potentially 
overriding
         duplicated options.
 
         :param configuration: The configuration to instantiate the environment 
with.
diff --git a/flink-python/pyflink/pyflink_gateway_server.py 
b/flink-python/pyflink/pyflink_gateway_server.py
index dcb3f34884e..09b77231ef7 100644
--- a/flink-python/pyflink/pyflink_gateway_server.py
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -43,29 +43,43 @@ def on_windows():
     return platform.system() == "Windows"
 
 
-def read_from_config(key, default_value, flink_conf_file):
-    value = default_value
-    # get the realpath of tainted path value to avoid CWE22 problem that 
constructs a path or URI
-    # using the tainted value and might allow an attacker to access, modify, 
or test the existence
-    # of critical or sensitive files.
-    with open(os.path.realpath(flink_conf_file), "r") as f:
-        while True:
-            line = f.readline()
-            if not line:
-                break
-            if line.startswith("#") or len(line.strip()) == 0:
-                continue
-            k, v = line.split(":", 1)
-            if k.strip() == key:
-                value = v.strip()
-    return value
+def read_from_config(key, default_value, flink_conf_directory):
+    import yaml
+    # try to find flink-conf.yaml file in flink_conf_directory
+    flink_conf_file = os.path.join(flink_conf_directory, "flink-conf.yaml")
+    if os.path.isfile(flink_conf_file):
+        # If flink-conf.yaml exists, use the old parsing logic to read the 
value
+        # get the realpath of tainted path value to avoid CWE22 problem that 
constructs a path
+        # or URI using the tainted value and might allow an attacker to 
access, modify, or test
+        # the existence of critical or sensitive files.
+        with open(os.path.realpath(flink_conf_file), "r") as f:
+            while True:
+                line = f.readline()
+                if not line:
+                    break
+                if line.startswith("#") or len(line.strip()) == 0:
+                    continue
+                k, v = line.split(":", 1)
+                if k.strip() == key:
+                    return v.strip()
+    else:
+        # If flink-conf.yaml does not exist, try to find config.yaml instead
+        config_file = os.path.join(flink_conf_directory, "config.yaml")
+        if os.path.isfile(config_file):
+            # If config.yaml exists, use YAML parser to read the value
+            with open(os.path.realpath(config_file), "r") as f:
+                config = yaml.safe_load(f)
+                return config.get(key, default_value)
+
+    # If neither file exists, return the default value
+    return default_value
 
 
 def find_java_executable():
     java_executable = "java.exe" if on_windows() else "java"
     flink_home = _find_flink_home()
-    flink_conf_file = os.path.join(flink_home, "conf", "flink-conf.yaml")
-    java_home = read_from_config(KEY_ENV_JAVA_HOME, None, flink_conf_file)
+    flink_conf_directory = os.path.join(flink_home, "conf")
+    java_home = read_from_config(KEY_ENV_JAVA_HOME, None, flink_conf_directory)
 
     if java_home is None and "JAVA_HOME" in os.environ:
         java_home = os.environ["JAVA_HOME"]
@@ -120,13 +134,12 @@ def construct_log_settings(env):
 
     flink_home = os.path.realpath(_find_flink_home())
     flink_conf_dir = env['FLINK_CONF_DIR']
-    flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
 
     if "FLINK_LOG_DIR" in env:
         flink_log_dir = env["FLINK_LOG_DIR"]
     else:
         flink_log_dir = read_from_config(
-            KEY_ENV_LOG_DIR, os.path.join(flink_home, "log"), flink_conf_file)
+            KEY_ENV_LOG_DIR, os.path.join(flink_home, "log"), 
env['FLINK_CONF_DIR'])
 
     if "LOG4J_PROPERTIES" in env:
         log4j_properties = env["LOG4J_PROPERTIES"]
@@ -156,14 +169,13 @@ def construct_log_settings(env):
 
 
 def get_jvm_opts(env):
-    flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
     jvm_opts = env.get("FLINK_ENV_JAVA_OPTS")
     if jvm_opts is None:
-        default_jvm_opts = read_from_config(KEY_ENV_JAVA_DEFAULT_OPTS, "", 
flink_conf_file)
+        default_jvm_opts = read_from_config(KEY_ENV_JAVA_DEFAULT_OPTS, "", 
env['FLINK_CONF_DIR'])
         extra_jvm_opts = read_from_config(
             KEY_ENV_JAVA_OPTS,
-            read_from_config(KEY_ENV_JAVA_OPTS_DEPRECATED, "", 
flink_conf_file),
-            flink_conf_file)
+            read_from_config(KEY_ENV_JAVA_OPTS_DEPRECATED, "", 
env['FLINK_CONF_DIR']),
+            env['FLINK_CONF_DIR'])
         jvm_opts = default_jvm_opts + " " + extra_jvm_opts
 
     # Remove leading and trailing double quotes (if present) of value
@@ -194,8 +206,6 @@ def construct_flink_classpath(env):
 
 
 def construct_hadoop_classpath(env):
-    flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
-
     hadoop_conf_dir = ""
     if 'HADOOP_CONF_DIR' not in env and 'HADOOP_CLASSPATH' not in env:
         if os.path.isdir("/etc/hadoop/conf"):
@@ -212,11 +222,11 @@ def construct_hadoop_classpath(env):
     return os.pathsep.join(
         [env.get("HADOOP_CLASSPATH", ""),
          env.get("YARN_CONF_DIR",
-                 read_from_config(KEY_ENV_YARN_CONF_DIR, "", flink_conf_file)),
+                 read_from_config(KEY_ENV_YARN_CONF_DIR, "", 
env['FLINK_CONF_DIR'])),
          env.get("HADOOP_CONF_DIR",
-                 read_from_config(KEY_ENV_HADOOP_CONF_DIR, hadoop_conf_dir, 
flink_conf_file)),
+                 read_from_config(KEY_ENV_HADOOP_CONF_DIR, hadoop_conf_dir, 
env['FLINK_CONF_DIR'])),
          env.get("HBASE_CONF_DIR",
-                 read_from_config(KEY_ENV_HBASE_CONF_DIR, hbase_conf_dir, 
flink_conf_file))])
+                 read_from_config(KEY_ENV_HBASE_CONF_DIR, hbase_conf_dir, 
env['FLINK_CONF_DIR']))])
 
 
 def construct_test_classpath(env):
diff --git a/flink-python/pyflink/table/table_config.py 
b/flink-python/pyflink/table/table_config.py
index bd12a447823..ba17767c776 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -36,7 +36,7 @@ class TableConfig(object):
     This class is a pure API class that abstracts configuration from various 
sources. Currently,
     configuration can be set in any of the following layers (in the given 
order):
 
-    - flink-conf.yaml
+    - config.yaml
     - CLI parameters
     - :class:`~pyflink.datastream.StreamExecutionEnvironment` when bridging to 
DataStream API
     - :func:`~EnvironmentSettings.Builder.with_configuration`
@@ -106,8 +106,8 @@ class TableConfig(object):
         jars_key = 
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
         classpaths_key = 
jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
         if key in [jars_key, classpaths_key]:
-            add_jars_to_context_class_loader(value.split(";"))
-
+            jar_urls = Configuration.parse_jars_value(value, jvm)
+            add_jars_to_context_class_loader(jar_urls)
         return self
 
     def get_local_timezone(self) -> str:
diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index 3327261ac41..ebfcc7cb095 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -1540,23 +1540,33 @@ class TableEnvironment(object):
             j_config.setString(jvm.PythonOptions.PYTHON_EXECUTABLE.key(), 
sys.executable)
 
     def _add_jars_to_j_env_config(self, config_key):
-        jvm = get_gateway().jvm
         jar_urls = self.get_config().get(config_key, None)
-        if jar_urls is not None:
-            # normalize
+
+        if jar_urls:
+            jvm = get_gateway().jvm
             jar_urls_list = []
-            for url in jar_urls.split(";"):
-                url = url.strip()
-                if url != "":
-                    jar_urls_list.append(jvm.java.net.URL(url).toString())
+            parsed_jar_urls = Configuration.parse_jars_value(jar_urls, jvm)
+            url_strings = [
+                jvm.java.net.URL(url).toString() if url else ""
+                for url in parsed_jar_urls
+            ]
+            self._parse_urls(url_strings, jar_urls_list)
+
             j_configuration = get_j_env_configuration(self._get_j_env())
-            if j_configuration.containsKey(config_key):
-                for url in j_configuration.getString(config_key, 
"").split(";"):
-                    url = url.strip()
-                    if url != "" and url not in jar_urls_list:
-                        jar_urls_list.append(url)
+            parsed_jar_urls = Configuration.parse_jars_value(
+                j_configuration.getString(config_key, ""),
+                jvm
+            )
+            self._parse_urls(parsed_jar_urls, jar_urls_list)
+
             j_configuration.setString(config_key, ";".join(jar_urls_list))
 
+    def _parse_urls(self, jar_urls, jar_urls_list):
+        for url in jar_urls:
+            url = url.strip()
+            if url != "" and url not in jar_urls_list:
+                jar_urls_list.append(url)
+
     def _get_j_env(self):
         return self._j_tenv.getPlanner().getExecEnv()
 
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 7161354be9f..24637fe4c7d 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -325,7 +325,9 @@ try:
                         'pandas>=1.3.0',
                         'pyarrow>=5.0.0',
                         'pemja==0.4.1;platform_system != "Windows"',
-                        'httplib2>=0.19.0', apache_flink_libraries_dependency]
+                        'httplib2>=0.19.0',
+                        'pyyaml>=6.0.1',
+                        apache_flink_libraries_dependency]
 
     setup(
         name='apache-flink',
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java 
b/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
index d7959c425e9..c58267e6fc9 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
@@ -48,7 +48,7 @@ public class PythonConfig implements ReadableConfig {
     }
 
     /**
-     * Configuration adopted from the outer layer, e.g. flink-conf.yaml, 
command line arguments,
+     * Configuration adopted from the outer layer, e.g. config.yaml, command 
line arguments,
      * TableConfig, etc.
      */
     private final ReadableConfig configuration;
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java 
b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index 17f425cebb8..142457bd19c 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -182,7 +182,7 @@ public class PythonOptions {
                                     .text("2. the command line option 
\"-pyclientexec\";")
                                     .linebreak()
                                     .text(
-                                            "3. the configuration 
'python.client.executable' defined in flink-conf.yaml")
+                                            "3. the configuration 
'python.client.executable' defined in config.yaml")
                                     .linebreak()
                                     .text("4. the environment variable 
PYFLINK_CLIENT_EXECUTABLE;")
                                     .build());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java
index d0185c5f0ee..8ea11a65ba3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java
@@ -46,8 +46,8 @@ public final class RestartBackoffTimeStrategyFactoryLoader {
      *   <li>Strategy set within job graph, i.e. {@link
      *       RestartStrategies.RestartStrategyConfiguration}, unless the 
config is {@link
      *       RestartStrategies.FallbackRestartStrategyConfiguration}.
-     *   <li>Strategy set in the cluster(server-side) config 
(flink-conf.yaml), unless the strategy
-     *       is not specified
+     *   <li>Strategy set in the cluster(server-side) config (config.yaml), 
unless the strategy is
+     *       not specified
      *   <li>{@link
      *       
FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory}
 if
      *       checkpointing is enabled. Otherwise {@link
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
index ea4f007449b..df4809714ef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
@@ -79,7 +79,7 @@ public class CheckpointStorageLoader {
             if (logger != null) {
                 logger.debug(
                         "The configuration {} has not be set in the current"
-                                + " sessions flink-conf.yaml. Falling back to 
a default CheckpointStorage"
+                                + " sessions config.yaml. Falling back to a 
default CheckpointStorage"
                                 + " type. Users are strongly encouraged 
explicitly set this configuration"
                                 + " so they understand how their applications 
are checkpointing"
                                 + " snapshots for fault-tolerance.",
@@ -138,7 +138,7 @@ public class CheckpointStorageLoader {
      * StreamExecutionEnvironment}.
      *
      * <p>3) Use the {@link CheckpointStorage} instance configured via the 
clusters
-     * <b>flink-conf.yaml</b>.
+     * <b>config.yaml</b>.
      *
      * <p>4) Load a default {@link CheckpointStorage} instance.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index aded04d7b6b..ec219432faa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -67,7 +67,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *             
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints");
  * }</pre>
  *
- * <p>If you are configuring your state backend via the {@code 
flink-conf.yaml} please make the
+ * <p>If you are configuring your state backend via the {@code config.yaml} 
please make the
  * following changes set your state backend type to "hashmap" {@code 
state.backend.type: hashmap}.
  *
  * <p>This state backend holds the working state in the memory (JVM heap) of 
the TaskManagers. The
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 87d8d77803e..6345c2b4602 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -59,7 +59,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  *             env.getCheckpointConfig().setCheckpointStorage(new 
JobManagerCheckpointStorage());
  * }</pre>
  *
- * <p>If you are configuring your state backend via the {@code 
flink-conf.yaml} please make the
+ * <p>If you are configuring your state backend via the {@code config.yaml} 
please make the
  * following changes:
  *
  * <pre>{@code
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
index 048515b5b44..6d15c53c1a2 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
@@ -698,7 +698,7 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
      * Sets the predefined options for RocksDB.
      *
      * <p>If user-configured options within {@link RocksDBConfigurableOptions} 
is set (through
-     * flink-conf.yaml) or a user-defined options factory is set (via {@link
+     * config.yaml) or a user-defined options factory is set (via {@link
      * #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the 
factory are applied on
      * top of the here specified predefined options and customized options.
      *
@@ -714,7 +714,7 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
      * PredefinedOptions#DEFAULT}.
      *
      * <p>If user-configured options within {@link RocksDBConfigurableOptions} 
is set (through
-     * flink-conf.yaml) of a user-defined options factory is set (via {@link
+     * config.yaml) of a user-defined options factory is set (via {@link
      * #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the 
factory are applied on
      * top of the predefined and customized options.
      *
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
index 69576520fc3..e78d9fb0eb5 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -145,7 +145,7 @@ public class RocksDBOptions {
                                             + "This option only takes effect 
if neither '%s' nor '%s' are not configured. If none is configured "
                                             + "then each RocksDB column family 
state has its own memory caches (as controlled by the column "
                                             + "family options). "
-                                            + "The relevant options for the 
shared resources (e.g. write-buffer-ratio) can be set on the same level 
(flink-conf.yaml)."
+                                            + "The relevant options for the 
shared resources (e.g. write-buffer-ratio) can be set on the same level 
(config.yaml)."
                                             + "Note, that this feature breaks 
resource isolation between the slots",
                                     USE_MANAGED_MEMORY.key(), 
FIX_PER_SLOT_MEMORY_SIZE.key()));
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 0c37a1e6bc5..dd3fcb70cf4 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -61,8 +61,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *             
env.getCheckpointConfig().setCheckpointStorage("hdfs://checkpoints");
  * }</pre>
  *
- * <p>If you are configuring your state backend via the {@code 
flink-conf.yaml} no changes are
- * required.
+ * <p>If you are configuring your state backend via the {@code config.yaml} no 
changes are required.
  *
  * <p>A State Backend that stores its state in {@code RocksDB}. This state 
backend can store very
  * large state that exceeds memory and spills to disk.
@@ -399,7 +398,7 @@ public class RocksDBStateBackend extends 
AbstractManagedMemoryStateBackend
      * Sets the predefined options for RocksDB.
      *
      * <p>If user-configured options within {@link RocksDBConfigurableOptions} 
is set (through
-     * flink-conf.yaml) or a user-defined options factory is set (via {@link
+     * config.yaml) or a user-defined options factory is set (via {@link
      * #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the 
factory are applied on
      * top of the here specified predefined options and customized options.
      *
@@ -415,7 +414,7 @@ public class RocksDBStateBackend extends 
AbstractManagedMemoryStateBackend
      * PredefinedOptions#DEFAULT}.
      *
      * <p>If user-configured options within {@link RocksDBConfigurableOptions} 
is set (through
-     * flink-conf.yaml) of a user-defined options factory is set (via {@link
+     * config.yaml) of a user-defined options factory is set (via {@link
      * #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the 
factory are applied on
      * top of the predefined and customized options.
      *
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 82a42dfdb3c..34d27e1bb59 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -249,7 +249,7 @@ public class RocksDBStateBackendConfigTest {
         env.close();
     }
 
-    /** Validates that user custom configuration from code should override the 
flink-conf.yaml. */
+    /** Validates that user custom configuration from code should override the 
config.yaml. */
     @Test
     public void testConfigureTimerServiceLoadingFromApplication() throws 
Exception {
         final MockEnvironment env = new MockEnvironmentBuilder().build();
@@ -524,7 +524,7 @@ public class RocksDBStateBackendConfigTest {
         // verify that we would use PredefinedOptions.DEFAULT by default.
         assertEquals(PredefinedOptions.DEFAULT, 
rocksDbBackend.getPredefinedOptions());
 
-        // verify that user could configure predefined options via 
flink-conf.yaml
+        // verify that user could configure predefined options via config.yaml
         Configuration configuration = new Configuration();
         configuration.set(
                 RocksDBOptions.PREDEFINED_OPTIONS, 
PredefinedOptions.FLASH_SSD_OPTIMIZED.name());
@@ -640,7 +640,7 @@ public class RocksDBStateBackendConfigTest {
         String checkpointPath = tempFolder.newFolder().toURI().toString();
         RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
 
-        // verify that user-defined options factory could be configured via 
flink-conf.yaml
+        // verify that user-defined options factory could be configured via 
config.yaml
         Configuration config = new Configuration();
         config.setString(RocksDBOptions.OPTIONS_FACTORY.key(), 
TestOptionsFactory.class.getName());
         config.setString(TestOptionsFactory.BACKGROUND_JOBS_OPTION.key(), "4");
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index f14f3d4400a..b179ead220d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -2591,7 +2591,7 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
      * execution environment, as returned by {@link 
#createLocalEnvironment(Configuration)}.
      *
      * <p>When executed from the command line the given configuration is 
stacked on top of the
-     * global configuration which comes from the {@code flink-conf.yaml}, 
potentially overriding
+     * global configuration which comes from the {@code config.yaml}, 
potentially overriding
      * duplicated options.
      *
      * @param configuration The configuration to instantiate the environment 
with.
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
index 551e05ef962..eae416cd0a6 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
@@ -127,8 +127,8 @@ public class DefaultContext {
     // 
-------------------------------------------------------------------------------------------
 
     /**
-     * Build the {@link DefaultContext} from flink-conf.yaml, dynamic 
configuration and users
-     * specified jars.
+     * Build the {@link DefaultContext} from config.yaml, dynamic 
configuration and users specified
+     * jars.
      *
      * @param dynamicConfig user specified configuration.
      * @param dependencies user specified jars
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java
index 0c3e3ede7a6..65daeb4a3a8 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java
@@ -43,7 +43,7 @@ public class SqlGatewayRestEndpointTestUtils {
         return rebuildRestEndpointOptions(context.getEndpointOptions());
     }
 
-    /** Create the configuration generated from flink-conf.yaml. */
+    /** Create the configuration generated from config.yaml. */
     public static Configuration getFlinkConfig(
             String address, String bindAddress, String portRange) {
         final Configuration config = new Configuration();
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
index e5d22ed943e..8ca455d31b4 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
@@ -184,7 +184,7 @@ class SessionContextTest {
                                 CatalogListener1.class.getName(),
                                 CatalogListener2.class.getName()));
 
-        // Find and create listeners from flink-conf.yaml for session
+        // Find and create listeners from config.yaml for session
         flinkConfig.set(
                 TableConfigOptions.TABLE_CATALOG_MODIFICATION_LISTENERS,
                 Arrays.asList(CatalogFactory1.IDENTIFIER, 
CatalogFactory2.IDENTIFIER));
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
index f80ad6393ec..3d3ffe29cca 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
@@ -50,7 +50,7 @@ import static 
org.apache.flink.table.api.internal.TableConfigValidation.validate
  * configuration can be set in any of the following layers (in the given 
order):
  *
  * <ol>
- *   <li>{@code flink-conf.yaml},
+ *   <li>{@code config.yaml},
  *   <li>CLI parameters,
  *   <li>{@code StreamExecutionEnvironment} when bridging to DataStream API,
  *   <li>{@link EnvironmentSettings.Builder#withConfiguration(Configuration)} 
/ {@link
@@ -103,7 +103,7 @@ public final class TableConfig implements WritableConfig, 
ReadableConfig {
 
     // Note to implementers:
     // TableConfig is a ReadableConfig which is built once the 
TableEnvironment is created and
-    // contains both the configuration defined in the execution context 
(flink-conf.yaml + CLI
+    // contains both the configuration defined in the execution context 
(config.yaml + CLI
     // params), stored in rootConfiguration, but also any extra configuration 
defined by the user in
     // the application, which has precedence over the execution configuration.
     //
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainersSettings.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainersSettings.java
index cd2bee6b39c..f3c79c4b35d 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainersSettings.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainersSettings.java
@@ -283,8 +283,8 @@ public class FlinkContainersSettings {
         }
 
         /**
-         * Sets a single Flink configuration parameter (the options for 
flink-conf.yaml) and returns
-         * a reference to this Builder enabling method chaining.
+         * Sets a single Flink configuration parameter (the options for 
config.yaml) and returns a
+         * reference to this Builder enabling method chaining.
          *
          * @param <T> The type parameter.
          * @param option The option.
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java
index 38b542e1310..1ee0c9f6999 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java
@@ -125,7 +125,7 @@ public class FlinkImageBuilder {
     }
 
     /**
-     * Sets Flink configuration. This configuration will be used for 
generating flink-conf.yaml for
+     * Sets Flink configuration. This configuration will be used for 
generating config.yaml for
      * configuring JobManager and TaskManager.
      */
     public FlinkImageBuilder setConfiguration(Configuration conf) {
@@ -209,12 +209,10 @@ public class FlinkImageBuilder {
             final Path flinkConfFile = createTemporaryFlinkConfFile(conf, 
tempDirectory);
 
             final Path log4jPropertiesFile = 
createTemporaryLog4jPropertiesFile(tempDirectory);
-            // Copy flink-conf.yaml into image
-            // NOTE: Before we change the default conf file in the flink-dist 
to 'config.yaml', we
-            // need to use the legacy flink conf file 'flink-conf.yaml' here.
+            // Copy config.yaml into image
             filesToCopy.put(
                     flinkConfFile,
-                    Paths.get(flinkHome, "conf", 
GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME));
+                    Paths.get(flinkHome, "conf", 
GlobalConfiguration.FLINK_CONF_FILENAME));
             filesToCopy.put(
                     log4jPropertiesFile, Paths.get(flinkHome, "conf", 
LOG4J_PROPERTIES_FILENAME));
 
@@ -292,13 +290,10 @@ public class FlinkImageBuilder {
 
     private Path createTemporaryFlinkConfFile(Configuration 
finalConfiguration, Path tempDirectory)
             throws IOException {
-        // Create a temporary flink-conf.yaml file and write merged 
configurations into it
-        // NOTE: Before we change the default conf file in the flink-dist to 
'config.yaml', we
-        // need to use the legacy flink conf file 'flink-conf.yaml' here.
-        Path flinkConfFile = 
tempDirectory.resolve(GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME);
+        Path flinkConfFile = 
tempDirectory.resolve(GlobalConfiguration.FLINK_CONF_FILENAME);
         Files.write(
                 flinkConfFile,
-                
ConfigurationUtils.convertConfigToWritableLines(finalConfiguration, true));
+                
ConfigurationUtils.convertConfigToWritableLines(finalConfiguration, false));
 
         return flinkConfFile;
     }
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index fcf32ea13be..ec7c6905ee7 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -821,10 +821,9 @@ public abstract class YarnTestBase {
 
             File flinkConfDirPath =
                     TestUtils.findFile(
-                            // NOTE: Before we change the default conf file in 
the flink-dist to
-                            // 'config.yaml', we need to use the legacy flink 
conf file
-                            // 'flink-conf.yaml' here.
-                            flinkDistRootDir, new ContainsName(new String[] 
{"flink-conf.yaml"}));
+                            flinkDistRootDir,
+                            new ContainsName(
+                                    new String[] 
{GlobalConfiguration.FLINK_CONF_FILENAME}));
             assertThat(flinkConfDirPath).isNotNull();
 
             final String confDirPath = 
flinkConfDirPath.getParentFile().getAbsolutePath();
@@ -839,7 +838,8 @@ public abstract class YarnTestBase {
             FileUtils.copyDirectory(new File(confDirPath), 
tempConfPathForSecureRun);
 
             BootstrapTools.writeConfiguration(
-                    globalConfiguration, new File(tempConfPathForSecureRun, 
"flink-conf.yaml"));
+                    globalConfiguration,
+                    new File(tempConfPathForSecureRun, 
GlobalConfiguration.FLINK_CONF_FILENAME));
 
             String configDir = tempConfPathForSecureRun.getAbsolutePath();
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 4f81077b16e..5d9c77695e0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -485,7 +485,7 @@ public final class Utils {
      *
      * @param flinkConfig The Flink configuration.
      * @param tmParams Parameters for the task manager.
-     * @param configDirectory The configuration directory for the 
flink-conf.yaml
+     * @param configDirectory The configuration directory for the config.yaml
      * @param logDirectory The log directory.
      * @param hasLogback Uses logback?
      * @param hasLog4j Uses log4j?

Reply via email to