This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9721ce835f5 [FLINK-33577][dist] Change the default config file to
config.yaml in flink-dist.
9721ce835f5 is described below
commit 9721ce835f5a7f28f2ad187346e009633307097b
Author: JunRuiLee <[email protected]>
AuthorDate: Wed Aug 23 22:48:30 2023 +0800
[FLINK-33577][dist] Change the default config file to config.yaml in
flink-dist.
This closes #24177.
---
.../generated/kubernetes_config_configuration.html | 2 +-
.../shortcodes/generated/python_configuration.html | 2 +-
.../generated/rocksdb_configuration.html | 2 +-
.../generated/state_backend_rocksdb_section.html | 2 +-
.../api/java/hadoop/mapred/utils/HadoopUtils.java | 2 +-
.../common/restartstrategy/RestartStrategies.java | 2 +-
.../flink/configuration/ConfigConstants.java | 4 +-
.../configuration/ResourceManagerOptions.java | 2 +-
flink-dist/src/main/assemblies/bin.xml | 2 +-
flink-dist/src/main/flink-bin/bin/config.sh | 8 +-
flink-dist/src/main/resources/config.yaml | 298 ++++++++++++++++++++
flink-dist/src/main/resources/flink-conf.yaml | 311 ---------------------
.../org/apache/flink/dist/BashJavaUtilsITCase.java | 51 +++-
.../flink/tests/util/flink/FlinkDistribution.java | 6 +-
flink-end-to-end-tests/test-scripts/common.sh | 18 +-
.../test-scripts/common_yarn_docker.sh | 4 +-
.../test-scripts/test_pyflink.sh | 10 +-
.../test_yarn_application_kerberos_docker.sh | 2 +-
.../test-scripts/test_yarn_job_kerberos_docker.sh | 2 +-
.../parquet/ParquetVectorizedInputFormat.java | 2 +-
.../java/org/apache/flink/api/java/DataSet.java | 2 +-
.../configuration/KubernetesConfigOptions.java | 2 +-
.../decorators/FlinkConfMountDecorator.java | 2 +-
.../parameters/KubernetesParameters.java | 2 +-
flink-python/dev/dev-requirements.txt | 1 +
flink-python/pyflink/common/configuration.py | 13 +-
flink-python/pyflink/common/restart_strategy.py | 2 +-
flink-python/pyflink/datastream/state_backend.py | 16 +-
.../datastream/stream_execution_environment.py | 2 +-
flink-python/pyflink/pyflink_gateway_server.py | 68 +++--
flink-python/pyflink/table/table_config.py | 6 +-
flink-python/pyflink/table/table_environment.py | 34 ++-
flink-python/setup.py | 4 +-
.../java/org/apache/flink/python/PythonConfig.java | 2 +-
.../org/apache/flink/python/PythonOptions.java | 2 +-
.../RestartBackoffTimeStrategyFactoryLoader.java | 4 +-
.../runtime/state/CheckpointStorageLoader.java | 4 +-
.../runtime/state/filesystem/FsStateBackend.java | 2 +-
.../runtime/state/memory/MemoryStateBackend.java | 2 +-
.../state/EmbeddedRocksDBStateBackend.java | 4 +-
.../contrib/streaming/state/RocksDBOptions.java | 2 +-
.../streaming/state/RocksDBStateBackend.java | 7 +-
.../state/RocksDBStateBackendConfigTest.java | 6 +-
.../environment/StreamExecutionEnvironment.java | 2 +-
.../gateway/service/context/DefaultContext.java | 4 +-
.../rest/util/SqlGatewayRestEndpointTestUtils.java | 2 +-
.../service/context/SessionContextTest.java | 2 +-
.../org/apache/flink/table/api/TableConfig.java | 4 +-
.../container/FlinkContainersSettings.java | 4 +-
.../testframe/container/FlinkImageBuilder.java | 15 +-
.../java/org/apache/flink/yarn/YarnTestBase.java | 10 +-
.../src/main/java/org/apache/flink/yarn/Utils.java | 2 +-
52 files changed, 510 insertions(+), 456 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
index 7bbdfd5e404..86d147f6c00 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
@@ -90,7 +90,7 @@
<td><h5>kubernetes.flink.conf.dir</h5></td>
<td style="word-wrap: break-word;">"/opt/flink/conf"</td>
<td>String</td>
- <td>The flink conf directory that will be mounted in pod. The
flink-conf.yaml, log4j.properties, logback.xml in this path will be overwritten
from config map.</td>
+ <td>The flink conf directory that will be mounted in pod. The
config.yaml, log4j.properties, logback.xml in this path will be overwritten
from config map.</td>
</tr>
<tr>
<td><h5>kubernetes.flink.log.dir</h5></td>
diff --git a/docs/layouts/shortcodes/generated/python_configuration.html
b/docs/layouts/shortcodes/generated/python_configuration.html
index 60ef6a9676e..d99c1f2a3b9 100644
--- a/docs/layouts/shortcodes/generated/python_configuration.html
+++ b/docs/layouts/shortcodes/generated/python_configuration.html
@@ -18,7 +18,7 @@
<td><h5>python.client.executable</h5></td>
<td style="word-wrap: break-word;">"python"</td>
<td>String</td>
- <td>The path of the Python interpreter used to launch the Python
process when submitting the Python jobs via "flink run" or compiling the
Java/Scala jobs containing Python UDFs. Equivalent to the command line option
"-pyclientexec" or the environment variable PYFLINK_CLIENT_EXECUTABLE. The
priority is as following: <br />1. the configuration 'python.client.executable'
defined in the source code(Only used in Flink Java SQL/Table API job call
Python UDF);<br />2. the command li [...]
+ <td>The path of the Python interpreter used to launch the Python
process when submitting the Python jobs via "flink run" or compiling the
Java/Scala jobs containing Python UDFs. Equivalent to the command line option
"-pyclientexec" or the environment variable PYFLINK_CLIENT_EXECUTABLE. The
priority is as following: <br />1. the configuration 'python.client.executable'
defined in the source code(Only used in Flink Java SQL/Table API job call
Python UDF);<br />2. the command li [...]
</tr>
<tr>
<td><h5>python.executable</h5></td>
diff --git a/docs/layouts/shortcodes/generated/rocksdb_configuration.html
b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
index 5cd25cda3f8..b6ad67234e0 100644
--- a/docs/layouts/shortcodes/generated/rocksdb_configuration.html
+++ b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
@@ -30,7 +30,7 @@
<td><h5>state.backend.rocksdb.memory.fixed-per-tm</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>MemorySize</td>
- <td>The fixed total amount of memory, shared among all RocksDB
instances per Task Manager (cluster-level option). This option only takes
effect if neither 'state.backend.rocksdb.memory.managed' nor
'state.backend.rocksdb.memory.fixed-per-slot' are not configured. If none is
configured then each RocksDB column family state has its own memory caches (as
controlled by the column family options). The relevant options for the shared
resources (e.g. write-buffer-ratio) can be set o [...]
+ <td>The fixed total amount of memory, shared among all RocksDB
instances per Task Manager (cluster-level option). This option only takes
effect if neither 'state.backend.rocksdb.memory.managed' nor
'state.backend.rocksdb.memory.fixed-per-slot' are not configured. If none is
configured then each RocksDB column family state has its own memory caches (as
controlled by the column family options). The relevant options for the shared
resources (e.g. write-buffer-ratio) can be set o [...]
</tr>
<tr>
<td><h5>state.backend.rocksdb.memory.high-prio-pool-ratio</h5></td>
diff --git
a/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html
b/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html
index 2dd92d7c0e0..7d917bb7ae8 100644
--- a/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html
+++ b/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html
@@ -18,7 +18,7 @@
<td><h5>state.backend.rocksdb.memory.fixed-per-tm</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>MemorySize</td>
- <td>The fixed total amount of memory, shared among all RocksDB
instances per Task Manager (cluster-level option). This option only takes
effect if neither 'state.backend.rocksdb.memory.managed' nor
'state.backend.rocksdb.memory.fixed-per-slot' are not configured. If none is
configured then each RocksDB column family state has its own memory caches (as
controlled by the column family options). The relevant options for the shared
resources (e.g. write-buffer-ratio) can be set o [...]
+ <td>The fixed total amount of memory, shared among all RocksDB
instances per Task Manager (cluster-level option). This option only takes
effect if neither 'state.backend.rocksdb.memory.managed' nor
'state.backend.rocksdb.memory.fixed-per-slot' are not configured. If none is
configured then each RocksDB column family state has its own memory caches (as
controlled by the column family options). The relevant options for the shared
resources (e.g. write-buffer-ratio) can be set o [...]
</tr>
<tr>
<td><h5>state.backend.rocksdb.memory.high-prio-pool-ratio</h5></td>
diff --git
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
index 0cf31d5cabc..67a53405d24 100644
---
a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
+++
b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
@@ -55,7 +55,7 @@ public final class HadoopUtils {
/**
* Returns a new Hadoop Configuration object using the path to the hadoop
conf configured in the
- * main configuration (flink-conf.yaml). This method is public because its
being used in the
+ * main configuration (config.yaml). This method is public because its
being used in the
* HadoopDataSource.
*
* @param flinkConfiguration Flink configuration object
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
index 191d89d957f..b099d944892 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
@@ -503,7 +503,7 @@ public class RestartStrategies {
/**
* Restart strategy configuration that could be used by jobs to use
cluster level restart
* strategy. Useful especially when one has a custom implementation of
restart strategy set via
- * flink-conf.yaml.
+ * config.yaml.
*/
@PublicEvolving
public static final class FallbackRestartStrategyConfiguration
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index ad7857af06d..b7751abd07b 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -391,7 +391,7 @@ public final class ConfigConstants {
/**
* Prefix for passing custom environment variables to Flink's master
process. For example for
* passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
- * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the
flink-conf.yaml.
+ * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the
config.yaml.
*
* @deprecated Use {@link
ResourceManagerOptions#CONTAINERIZED_MASTER_ENV_PREFIX} instead.
*/
@@ -482,7 +482,7 @@ public final class ConfigConstants {
/**
* Prefix for passing custom environment variables to Flink's
ApplicationMaster (JobManager).
* For example for passing LD_LIBRARY_PATH as an env variable to the
AppMaster, set:
- * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the
flink-conf.yaml.
+ * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the
config.yaml.
*
* @deprecated Please use {@code CONTAINERIZED_MASTER_ENV_PREFIX}.
*/
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
index e846e0a114e..10283e68657 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -276,7 +276,7 @@ public class ResourceManagerOptions {
/**
* Prefix for passing custom environment variables to Flink's master
process. For example for
* passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
- * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the
flink-conf.yaml.
+ * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the
config.yaml.
*/
public static final String CONTAINERIZED_MASTER_ENV_PREFIX =
"containerized.master.env.";
diff --git a/flink-dist/src/main/assemblies/bin.xml
b/flink-dist/src/main/assemblies/bin.xml
index 9753e1db003..3ac51f4562f 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -123,7 +123,7 @@ under the License.
<!-- copy the config file -->
<file>
- <source>src/main/resources/flink-conf.yaml</source>
+ <source>src/main/resources/config.yaml</source>
<outputDirectory>conf</outputDirectory>
<fileMode>0644</fileMode>
</file>
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh
b/flink-dist/src/main/flink-bin/bin/config.sh
index d90e4362f7e..2959f9c5ece 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -112,12 +112,12 @@ readFromConfig() {
}
########################################################################################################################
-# DEFAULT CONFIG VALUES: These values will be used when nothing has been
specified in conf/flink-conf.yaml
+# DEFAULT CONFIG VALUES: These values will be used when nothing has been
specified in conf/config.yaml
# -or- the respective environment variables are not set.
########################################################################################################################
# WARNING !!! , these values are only used if there is nothing else is
specified in
-# conf/flink-conf.yaml
+# conf/config.yaml
DEFAULT_ENV_PID_DIR="/tmp" # Directory to store *.pid
files to
DEFAULT_ENV_LOG_MAX=10 # Maximum number of old
log files to keep
@@ -134,7 +134,7 @@ DEFAULT_HADOOP_CONF_DIR="" #
Hadoop Configuration Direc
DEFAULT_HBASE_CONF_DIR="" # HBase Configuration
Directory, if necessary
########################################################################################################################
-# CONFIG KEYS: The default values can be overwritten by the following keys in
conf/flink-conf.yaml
+# CONFIG KEYS: The default values can be overwritten by the following keys in
conf/config.yaml
########################################################################################################################
KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"
@@ -351,7 +351,7 @@ if [ -z "${HIGH_AVAILABILITY}" ]; then
fi
# Arguments for the JVM. Used for job and task manager JVMs.
-# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
+# DO NOT USE FOR MEMORY SETTINGS! Use conf/config.yaml with keys
# JobManagerOptions#TOTAL_PROCESS_MEMORY and
TaskManagerOptions#TOTAL_PROCESS_MEMORY for that!
if [ -z "${JVM_ARGS}" ]; then
JVM_ARGS=""
diff --git a/flink-dist/src/main/resources/config.yaml
b/flink-dist/src/main/resources/config.yaml
new file mode 100644
index 00000000000..08aace171ea
--- /dev/null
+++ b/flink-dist/src/main/resources/config.yaml
@@ -0,0 +1,298 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# These parameters are required for Java 17 support.
+# They can be safely removed when using Java 8/11.
+env:
+ java:
+ opts:
+ all: --add-exports=java.base/sun.net.util=ALL-UNNAMED
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNA [...]
+
+#==============================================================================
+# Common
+#==============================================================================
+
+jobmanager:
+ # The host interface the JobManager will bind to. By default, this is
localhost, and will prevent
+ # the JobManager from communicating outside the machine/container it is
running on.
+ # On YARN this setting will be ignored if it is set to 'localhost',
defaulting to 0.0.0.0.
+ # On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
+ #
+ # To enable this, set the bind-host address to one that has access to an
outside facing network
+ # interface, such as 0.0.0.0.
+ bind-host: localhost
+ rpc:
+ # The external address of the host on which the JobManager runs and can be
+ # reached by the TaskManagers and any clients which want to connect. This
setting
+ # is only used in Standalone mode and may be overwritten on the JobManager
side
+ # by specifying the --host <hostname> parameter of the bin/jobmanager.sh
executable.
+ # In high availability mode, if you use the bin/start-cluster.sh script
and setup
+ # the conf/masters file, this will be taken care of automatically. Yarn
+ # automatically configure the host name based on the hostname of the node
where the
+ # JobManager runs.
+ address: localhost
+ # The RPC port where the JobManager is reachable.
+ port: 6123
+ memory:
+ process:
+ # The total process memory size for the JobManager.
+ # Note this accounts for all memory usage within the JobManager process,
including JVM metaspace and other overhead.
+ size: 1600m
+ execution:
+ # The failover strategy, i.e., how the job computation recovers from task
failures.
+ # Only restart tasks that may have been affected by the task failure,
which typically includes
+ # downstream tasks and potentially upstream tasks if their produced data
is no longer available for consumption.
+ failover-strategy: region
+
+taskmanager:
+ # The host interface the TaskManager will bind to. By default, this is
localhost, and will prevent
+ # the TaskManager from communicating outside the machine/container it is
running on.
+ # On YARN this setting will be ignored if it is set to 'localhost',
defaulting to 0.0.0.0.
+ # On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
+ #
+ # To enable this, set the bind-host address to one that has access to an
outside facing network
+ # interface, such as 0.0.0.0.
+ bind-host: localhost
+ # The address of the host on which the TaskManager runs and can be reached
by the JobManager and
+ # other TaskManagers. If not specified, the TaskManager will try different
strategies to identify
+ # the address.
+ #
+ # Note this address needs to be reachable by the JobManager and forward
traffic to one of
+ # the interfaces the TaskManager is bound to (see 'taskmanager.bind-host').
+ #
+ # Note also that unless all TaskManagers are running on the same machine,
this address needs to be
+ # configured separately for each TaskManager.
+ host: localhost
+ # The number of task slots that each TaskManager offers. Each slot runs one
parallel pipeline.
+ numberOfTaskSlots: 1
+ memory:
+ process:
+ # The total process memory size for the TaskManager.
+ #
+ # Note this accounts for all memory usage within the TaskManager
process, including JVM metaspace and other overhead.
+ # To exclude JVM metaspace and overhead, please, use total Flink memory
size instead of 'taskmanager.memory.process.size'.
+ # It is not recommended to set both 'taskmanager.memory.process.size'
and Flink memory.
+ size: 1728m
+
+parallelism:
+ # The parallelism used for programs that did not specify and other
parallelism.
+ default: 1
+
+# # The default file system scheme and authority.
+# # By default file paths without scheme are interpreted relative to the local
+# # root file system 'file:///'. Use this to override the default and interpret
+# # relative paths relative to a different file system,
+# # for example 'hdfs://mynamenode:12345'
+# fs:
+# default-scheme: hdfs://mynamenode:12345
+
+#==============================================================================
+# High Availability
+#==============================================================================
+
+# high-availability:
+# # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
+# type: zookeeper
+# # The path where metadata for master recovery is persisted. While
ZooKeeper stores
+# # the small ground truth for checkpoint and leader election, this location
stores
+# # the larger objects, like persisted dataflow graphs.
+# #
+# # Must be a durable file system that is accessible from all nodes
+# # (like HDFS, S3, Ceph, nfs, ...)
+# storageDir: hdfs:///flink/ha/
+# zookeeper:
+# # The list of ZooKeeper quorum peers that coordinate the
high-availability
+# # setup. This must be a list of the form:
+# # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
+# quorum: localhost:2181
+# client:
+# # ACL options are based on
https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
+# # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open"
(ZOO_OPEN_ACL_UNSAFE)
+# # The default value is "open" and it can be changed to "creator" if ZK
security is enabled
+# acl: open
+
+#==============================================================================
+# Fault tolerance and checkpointing
+#==============================================================================
+
+# The backend that will be used to store operator state checkpoints if
+# checkpointing is enabled. Checkpointing is enabled when
execution.checkpointing.interval > 0.
+
+# # Execution checkpointing related parameters. Please refer to
CheckpointConfig and ExecutionCheckpointingOptions for more details.
+# execution:
+# checkpointing:
+# interval: 3min
+# externalized-checkpoint-retention: [DELETE_ON_CANCELLATION,
RETAIN_ON_CANCELLATION]
+# max-concurrent-checkpoints: 1
+# min-pause: 0
+# mode: [EXACTLY_ONCE, AT_LEAST_ONCE]
+# timeout: 10min
+# tolerable-failed-checkpoints: 0
+# unaligned: false
+
+# state:
+# backend:
+# # Supported backends are 'hashmap', 'rocksdb', or the
+# # <class-name-of-factory>.
+# type: hashmap
+# # Flag to enable/disable incremental checkpoints for backends that
+# # support incremental checkpoints (like the RocksDB state backend).
+# incremental: false
+# checkpoints:
+# # Directory for checkpoints filesystem, when using any of the default
bundled
+# # state backends.
+# dir: hdfs://namenode-host:port/flink-checkpoints
+# savepoints:
+# # Default target directory for savepoints, optional.
+# dir: hdfs://namenode-host:port/flink-savepoints
+
+#==============================================================================
+# Rest & web frontend
+#==============================================================================
+
+rest:
+ # The address to which the REST client will connect to
+ address: localhost
+ # The address that the REST & web server binds to
+ # By default, this is localhost, which prevents the REST & web server from
+ # being able to communicate outside of the machine/container it is running
on.
+ #
+ # To enable this, set the bind address to one that has access to
outside-facing
+ # network interface, such as 0.0.0.0.
+ bind-address: localhost
+ # # The port to which the REST client connects to. If rest.bind-port has
+ # # not been specified, then the server will bind to this port as well.
+ # port: 8081
+ # # Port range for the REST and web server to bind to.
+ # bind-port: 8080-8090
+
+# web:
+# submit:
+# # Flag to specify whether job submission is enabled from the web-based
+# # runtime monitor. Uncomment to disable.
+# enable: false
+# cancel:
+# # Flag to specify whether job cancellation is enabled from the web-based
+# # runtime monitor. Uncomment to disable.
+# enable: false
+
+#==============================================================================
+# Advanced
+#==============================================================================
+
+# io:
+# tmp:
+# # Override the directories for temporary files. If not specified, the
+# # system-specific Java temporary directory (java.io.tmpdir property) is
taken.
+# #
+# # For framework setups on Yarn, Flink will automatically pick up the
+# # containers' temp directories without any need for configuration.
+# #
+# # Add a delimited list for multiple directories, using the system
directory
+# # delimiter (colon ':' on unix) or a comma, e.g.:
+# # /data1/tmp:/data2/tmp:/data3/tmp
+# #
+# # Note: Each directory entry is read from and written to by a different
I/O
+# # thread. You can include the same directory multiple times in order to
create
+# # multiple I/O threads against that directory. This is for example
relevant for
+# # high-throughput RAIDs.
+# dirs: /tmp
+
+# classloader:
+# resolve:
+# # The classloading resolve order. Possible values are 'child-first'
(Flink's default)
+# # and 'parent-first' (Java's default).
+# #
+# # Child first classloading allows users to use different
dependency/library
+# # versions in their application than those in the classpath. Switching
back
+# # to 'parent-first' may help with debugging dependency issues.
+# order: child-first
+
+# The amount of memory going to the network stack. These numbers usually need
+# no tuning. Adjusting them may be necessary in case of an "Insufficient number
+# of network buffers" error. The default min is 64MB, the default max is 1GB.
+#
+# taskmanager:
+# memory:
+# network:
+# fraction: 0.1
+# min: 64mb
+# max: 1gb
+
+#==============================================================================
+# Flink Cluster Security Configuration
+#==============================================================================
+
+# Kerberos authentication for various components - Hadoop, ZooKeeper, and
connectors -
+# may be enabled in four steps:
+# 1. configure the local krb5.conf file
+# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
+# 3. make the credentials available to various JAAS login contexts
+# 4. configure the connector to use JAAS/SASL
+
+# # The below configure how Kerberos credentials are provided. A keytab will
be used instead of
+# # a ticket cache if the keytab path and principal are set.
+# security:
+# kerberos:
+# login:
+# use-ticket-cache: true
+# keytab: /path/to/kerberos/keytab
+# principal: flink-user
+# # The configuration below defines which JAAS login contexts
+# contexts: Client,KafkaClient
+
+#==============================================================================
+# ZK Security Configuration
+#==============================================================================
+
+# zookeeper:
+# sasl:
+# # Below configurations are applicable if ZK ensemble is configured for
security
+# #
+# # Override below configuration to provide custom ZK service name if
configured
+# # zookeeper.sasl.service-name: zookeeper
+# #
+# # The configuration below must match one of the values set in
"security.kerberos.login.contexts"
+# login-context-name: Client
+
+#==============================================================================
+# HistoryServer
+#==============================================================================
+
+# The HistoryServer is started and stopped via bin/historyserver.sh
(start|stop)
+#
+# jobmanager:
+# archive:
+# fs:
+# # Directory to upload completed jobs to. Add this directory to the
list of
+# # monitored directories of the HistoryServer as well (see below).
+# dir: hdfs:///completed-jobs/
+
+# historyserver:
+# web:
+# # The address under which the web-based HistoryServer listens.
+# address: 0.0.0.0
+# # The port under which the web-based HistoryServer listens.
+# port: 8082
+# archive:
+# fs:
+# # Comma separated list of directories to monitor for completed jobs.
+# dir: hdfs:///completed-jobs/
+# # Interval in milliseconds for refreshing the monitored directories.
+# fs.refresh-interval: 10000
+
diff --git a/flink-dist/src/main/resources/flink-conf.yaml
b/flink-dist/src/main/resources/flink-conf.yaml
deleted file mode 100644
index b5aa2794dd9..00000000000
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ /dev/null
@@ -1,311 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# These parameters are required for Java 17 support.
-# They can be safely removed when using Java 8/11.
-env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
--add-exports=java.security.jgss/sun.security.krb5= [...]
-
-#==============================================================================
-# Common
-#==============================================================================
-
-# The external address of the host on which the JobManager runs and can be
-# reached by the TaskManagers and any clients which want to connect. This
setting
-# is only used in Standalone mode and may be overwritten on the JobManager side
-# by specifying the --host <hostname> parameter of the bin/jobmanager.sh
executable.
-# In high availability mode, if you use the bin/start-cluster.sh script and
setup
-# the conf/masters file, this will be taken care of automatically. Yarn
-# automatically configure the host name based on the hostname of the node
where the
-# JobManager runs.
-
-jobmanager.rpc.address: localhost
-
-# The RPC port where the JobManager is reachable.
-
-jobmanager.rpc.port: 6123
-
-# The host interface the JobManager will bind to. By default, this is
localhost, and will prevent
-# the JobManager from communicating outside the machine/container it is
running on.
-# On YARN this setting will be ignored if it is set to 'localhost', defaulting
to 0.0.0.0.
-# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
-#
-# To enable this, set the bind-host address to one that has access to an
outside facing network
-# interface, such as 0.0.0.0.
-
-jobmanager.bind-host: localhost
-
-
-# The total process memory size for the JobManager.
-#
-# Note this accounts for all memory usage within the JobManager process,
including JVM metaspace and other overhead.
-
-jobmanager.memory.process.size: 1600m
-
-# The host interface the TaskManager will bind to. By default, this is
localhost, and will prevent
-# the TaskManager from communicating outside the machine/container it is
running on.
-# On YARN this setting will be ignored if it is set to 'localhost', defaulting
to 0.0.0.0.
-# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
-#
-# To enable this, set the bind-host address to one that has access to an
outside facing network
-# interface, such as 0.0.0.0.
-
-taskmanager.bind-host: localhost
-
-# The address of the host on which the TaskManager runs and can be reached by
the JobManager and
-# other TaskManagers. If not specified, the TaskManager will try different
strategies to identify
-# the address.
-#
-# Note this address needs to be reachable by the JobManager and forward
traffic to one of
-# the interfaces the TaskManager is bound to (see 'taskmanager.bind-host').
-#
-# Note also that unless all TaskManagers are running on the same machine, this
address needs to be
-# configured separately for each TaskManager.
-
-taskmanager.host: localhost
-
-# The total process memory size for the TaskManager.
-#
-# Note this accounts for all memory usage within the TaskManager process,
including JVM metaspace and other overhead.
-
-taskmanager.memory.process.size: 1728m
-
-# To exclude JVM metaspace and overhead, please, use total Flink memory size
instead of 'taskmanager.memory.process.size'.
-# It is not recommended to set both 'taskmanager.memory.process.size' and
Flink memory.
-#
-# taskmanager.memory.flink.size: 1280m
-
-# The number of task slots that each TaskManager offers. Each slot runs one
parallel pipeline.
-
-taskmanager.numberOfTaskSlots: 1
-
-# The parallelism used for programs that did not specify and other parallelism.
-
-parallelism.default: 1
-
-# The default file system scheme and authority.
-#
-# By default file paths without scheme are interpreted relative to the local
-# root file system 'file:///'. Use this to override the default and interpret
-# relative paths relative to a different file system,
-# for example 'hdfs://mynamenode:12345'
-#
-# fs.default-scheme
-
-#==============================================================================
-# High Availability
-#==============================================================================
-
-# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
-#
-# high-availability.type: zookeeper
-
-# The path where metadata for master recovery is persisted. While ZooKeeper
stores
-# the small ground truth for checkpoint and leader election, this location
stores
-# the larger objects, like persisted dataflow graphs.
-#
-# Must be a durable file system that is accessible from all nodes
-# (like HDFS, S3, Ceph, nfs, ...)
-#
-# high-availability.storageDir: hdfs:///flink/ha/
-
-# The list of ZooKeeper quorum peers that coordinate the high-availability
-# setup. This must be a list of the form:
-# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
-#
-# high-availability.zookeeper.quorum: localhost:2181
-
-
-# ACL options are based on
https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
-# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open"
(ZOO_OPEN_ACL_UNSAFE)
-# The default value is "open" and it can be changed to "creator" if ZK
security is enabled
-#
-# high-availability.zookeeper.client.acl: open
-
-#==============================================================================
-# Fault tolerance and checkpointing
-#==============================================================================
-
-# The backend that will be used to store operator state checkpoints if
-# checkpointing is enabled. Checkpointing is enabled when
execution.checkpointing.interval > 0.
-#
-# Execution checkpointing related parameters. Please refer to CheckpointConfig
and ExecutionCheckpointingOptions for more details.
-#
-# execution.checkpointing.interval: 3min
-# execution.checkpointing.externalized-checkpoint-retention:
[DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]
-# execution.checkpointing.max-concurrent-checkpoints: 1
-# execution.checkpointing.min-pause: 0
-# execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE]
-# execution.checkpointing.timeout: 10min
-# execution.checkpointing.tolerable-failed-checkpoints: 0
-# execution.checkpointing.unaligned: false
-#
-# Supported backends are 'hashmap', 'rocksdb', or the
-# <class-name-of-factory>.
-#
-# state.backend.type: hashmap
-
-# Directory for checkpoints filesystem, when using any of the default bundled
-# state backends.
-#
-# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
-
-# Default target directory for savepoints, optional.
-#
-# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
-
-# Flag to enable/disable incremental checkpoints for backends that
-# support incremental checkpoints (like the RocksDB state backend).
-#
-# state.backend.incremental: false
-
-# The failover strategy, i.e., how the job computation recovers from task
failures.
-# Only restart tasks that may have been affected by the task failure, which
typically includes
-# downstream tasks and potentially upstream tasks if their produced data is no
longer available for consumption.
-
-jobmanager.execution.failover-strategy: region
-
-#==============================================================================
-# Rest & web frontend
-#==============================================================================
-
-# The port to which the REST client connects to. If rest.bind-port has
-# not been specified, then the server will bind to this port as well.
-#
-#rest.port: 8081
-
-# The address to which the REST client will connect to
-#
-rest.address: localhost
-
-# Port range for the REST and web server to bind to.
-#
-#rest.bind-port: 8080-8090
-
-# The address that the REST & web server binds to
-# By default, this is localhost, which prevents the REST & web server from
-# being able to communicate outside of the machine/container it is running on.
-#
-# To enable this, set the bind address to one that has access to outside-facing
-# network interface, such as 0.0.0.0.
-#
-rest.bind-address: localhost
-
-# Flag to specify whether job submission is enabled from the web-based
-# runtime monitor. Uncomment to disable.
-
-#web.submit.enable: false
-
-# Flag to specify whether job cancellation is enabled from the web-based
-# runtime monitor. Uncomment to disable.
-
-#web.cancel.enable: false
-
-#==============================================================================
-# Advanced
-#==============================================================================
-
-# Override the directories for temporary files. If not specified, the
-# system-specific Java temporary directory (java.io.tmpdir property) is taken.
-#
-# For framework setups on Yarn, Flink will automatically pick up the
-# containers' temp directories without any need for configuration.
-#
-# Add a delimited list for multiple directories, using the system directory
-# delimiter (colon ':' on unix) or a comma, e.g.:
-# /data1/tmp:/data2/tmp:/data3/tmp
-#
-# Note: Each directory entry is read from and written to by a different I/O
-# thread. You can include the same directory multiple times in order to create
-# multiple I/O threads against that directory. This is for example relevant for
-# high-throughput RAIDs.
-#
-# io.tmp.dirs: /tmp
-
-# The classloading resolve order. Possible values are 'child-first' (Flink's
default)
-# and 'parent-first' (Java's default).
-#
-# Child first classloading allows users to use different dependency/library
-# versions in their application than those in the classpath. Switching back
-# to 'parent-first' may help with debugging dependency issues.
-#
-# classloader.resolve-order: child-first
-
-# The amount of memory going to the network stack. These numbers usually need
-# no tuning. Adjusting them may be necessary in case of an "Insufficient number
-# of network buffers" error. The default min is 64MB, the default max is 1GB.
-#
-# taskmanager.memory.network.fraction: 0.1
-# taskmanager.memory.network.min: 64mb
-# taskmanager.memory.network.max: 1gb
-
-#==============================================================================
-# Flink Cluster Security Configuration
-#==============================================================================
-
-# Kerberos authentication for various components - Hadoop, ZooKeeper, and
connectors -
-# may be enabled in four steps:
-# 1. configure the local krb5.conf file
-# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
-# 3. make the credentials available to various JAAS login contexts
-# 4. configure the connector to use JAAS/SASL
-
-# The below configure how Kerberos credentials are provided. A keytab will be
used instead of
-# a ticket cache if the keytab path and principal are set.
-
-# security.kerberos.login.use-ticket-cache: true
-# security.kerberos.login.keytab: /path/to/kerberos/keytab
-# security.kerberos.login.principal: flink-user
-
-# The configuration below defines which JAAS login contexts
-
-# security.kerberos.login.contexts: Client,KafkaClient
-
-#==============================================================================
-# ZK Security Configuration
-#==============================================================================
-
-# Below configurations are applicable if ZK ensemble is configured for security
-
-# Override below configuration to provide custom ZK service name if configured
-# zookeeper.sasl.service-name: zookeeper
-
-# The configuration below must match one of the values set in
"security.kerberos.login.contexts"
-# zookeeper.sasl.login-context-name: Client
-
-#==============================================================================
-# HistoryServer
-#==============================================================================
-
-# The HistoryServer is started and stopped via bin/historyserver.sh
(start|stop)
-
-# Directory to upload completed jobs to. Add this directory to the list of
-# monitored directories of the HistoryServer as well (see below).
-#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
-
-# The address under which the web-based HistoryServer listens.
-#historyserver.web.address: 0.0.0.0
-
-# The port under which the web-based HistoryServer listens.
-#historyserver.web.port: 8082
-
-# Comma separated list of directories to monitor for completed jobs.
-#historyserver.archive.fs.dir: hdfs:///completed-jobs/
-
-# Interval in milliseconds for refreshing the monitored directories.
-#historyserver.archive.fs.refresh-interval: 10000
-
diff --git
a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java
b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java
index 744ae1dccdd..df86e666a20 100644
--- a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java
+++ b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java
@@ -18,15 +18,24 @@
package org.apache.flink.dist;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.util.bash.BashJavaUtils;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.shaded.guava32.com.google.common.collect.Sets;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -49,6 +58,8 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
private static final String RUN_EXTRACT_LOGGING_OUTPUTS_SCRIPT =
"src/test/bin/runExtractLoggingOutputs.sh";
+ @TempDir private Path tmpDir;
+
@Test
void testGetTmResourceParamsConfigs() throws Exception {
int expectedResultLines = 2;
@@ -122,7 +133,7 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
@Test
void testGetConfiguration() throws Exception {
- int expectedResultLines = 13;
+ int expectedResultLines = 26;
String[] commands = {
RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
@@ -135,7 +146,7 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
@Test
void testGetConfigurationRemoveKey() throws Exception {
- int expectedResultLines = 12;
+ int expectedResultLines = 24;
String[] commands = {
RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
@@ -146,12 +157,13 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
List<String> lines =
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
assertThat(lines).hasSize(expectedResultLines);
- assertThat(lines).doesNotContain("parallelism.default: 1");
+ Configuration configuration = loadConfiguration(lines);
+
assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).isEmpty();
}
@Test
void testGetConfigurationRemoveKeyValue() throws Exception {
- int expectedResultLines = 12;
+ int expectedResultLines = 24;
String[] commands = {
RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
@@ -162,12 +174,13 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
List<String> lines =
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
assertThat(lines).hasSize(expectedResultLines);
- assertThat(lines).doesNotContain("parallelism.default: 1");
+ Configuration configuration = loadConfiguration(lines);
+
assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).isEmpty();
}
@Test
void testGetConfigurationRemoveKeyValueNotMatchingValue() throws Exception
{
- int expectedResultLines = 13;
+ int expectedResultLines = 26;
String[] commands = {
RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
@@ -178,12 +191,13 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
List<String> lines =
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
assertThat(lines).hasSize(expectedResultLines);
- assertThat(lines).contains("parallelism.default: 1");
+ Configuration configuration = loadConfiguration(lines);
+
assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).hasValue(1);
}
@Test
void testGetConfigurationReplaceKeyValue() throws Exception {
- int expectedResultLines = 13;
+ int expectedResultLines = 26;
String[] commands = {
RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
@@ -194,13 +208,13 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
List<String> lines =
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
assertThat(lines).hasSize(expectedResultLines);
- assertThat(lines).doesNotContain("parallelism.default: 1");
- assertThat(lines).contains("parallelism.default: 2");
+ Configuration configuration = loadConfiguration(lines);
+
assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).hasValue(2);
}
@Test
void testGetConfigurationReplaceKeyValueNotMatchingValue() throws
Exception {
- int expectedResultLines = 13;
+ int expectedResultLines = 26;
String[] commands = {
RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(),
@@ -211,7 +225,8 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
List<String> lines =
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
assertThat(lines).hasSize(expectedResultLines);
- assertThat(lines).doesNotContain("parallelism.default: 3");
+ Configuration configuration = loadConfiguration(lines);
+
assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).hasValue(1);
}
private static Map<String, String> parseAndAssertDynamicParameters(
@@ -274,4 +289,16 @@ class BashJavaUtilsITCase extends JavaBashTestBase {
assertThat(actualOutput).isEqualTo(expectedOutput);
}
+
+ private Configuration loadConfiguration(List<String> lines) throws
IOException {
+ File file =
+ TempDirUtils.newFile(
+ tmpDir.toAbsolutePath(),
GlobalConfiguration.FLINK_CONF_FILENAME);
+ try (final PrintWriter pw = new PrintWriter(file)) {
+ for (String line : lines) {
+ pw.println(line);
+ }
+ }
+ return
GlobalConfiguration.loadConfiguration(tmpDir.toAbsolutePath().toString());
+ }
}
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index 5969f0850d9..92c9986e394 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -418,11 +418,9 @@ public final class FlinkDistribution {
mergedConfig.addAll(defaultConfig);
mergedConfig.addAll(config);
- // NOTE: Before we change the default conf file in the flink-dist to
'config.yaml', we
- // need to use the legacy flink conf file 'flink-conf.yaml' here.
Files.write(
- conf.resolve(GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME),
- ConfigurationUtils.convertConfigToWritableLines(mergedConfig,
true));
+ conf.resolve(GlobalConfiguration.FLINK_CONF_FILENAME),
+ ConfigurationUtils.convertConfigToWritableLines(mergedConfig,
false));
}
public void setTaskExecutorHosts(Collection<String> taskExecutorHosts)
throws IOException {
diff --git a/flink-end-to-end-tests/test-scripts/common.sh
b/flink-end-to-end-tests/test-scripts/common.sh
old mode 100644
new mode 100755
index a7c4ce5a68a..d3452b4b1bb
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -50,6 +50,16 @@ TEST_INFRA_DIR=`pwd -P`
cd $TEST_ROOT
source "${TEST_INFRA_DIR}/common_utils.sh"
+source "${FLINK_DIR}/bin/bash-java-utils.sh"
+
+if [[ -z "${FLINK_CONF_DIR:-}" ]]; then
+ FLINK_CONF_DIR="$FLINK_DIR/conf"
+fi
+FLINK_CONF=${FLINK_CONF_DIR}/config.yaml
+# Flatten the configuration file config.yaml to enable end-to-end test cases
which will modify
+# it directly through shell scripts.
+output=$(updateAndGetFlinkConfiguration "${FLINK_CONF_DIR}" "${FLINK_DIR}/bin"
"${FLINK_DIR}/lib" -flatten)
+echo "$output" > $FLINK_CONF
NODENAME=${NODENAME:-"localhost"}
@@ -143,14 +153,14 @@ function swap_planner_scala_with_planner_loader() {
function delete_config_key() {
local config_key=$1
- sed -i -e "/^${config_key}: /d" ${FLINK_DIR}/conf/flink-conf.yaml
+ sed -i -e "/^${config_key}: /d" $FLINK_CONF
}
function set_config_key() {
local config_key=$1
local value=$2
delete_config_key ${config_key}
- echo "$config_key: $value" >> $FLINK_DIR/conf/flink-conf.yaml
+ echo "$config_key: $value" >> $FLINK_CONF
}
function create_ha_config() {
@@ -166,7 +176,7 @@ function create_ha_config() {
# This must have all the masters to be used in HA.
echo "localhost:8081" > ${FLINK_DIR}/conf/masters
- # then move on to create the flink-conf.yaml
+ # then move on to create the config.yaml
#==============================================================================
# Common
#==============================================================================
@@ -688,7 +698,7 @@ function setup_flink_slf4j_metric_reporter() {
METRIC_NAME_PATTERN="${1:-"*"}"
set_config_key "metrics.reporter.slf4j.factory.class"
"org.apache.flink.metrics.slf4j.Slf4jReporterFactory"
set_config_key "metrics.reporter.slf4j.interval" "1 SECONDS"
- set_config_key "metrics.reporter.slf4j.filter.includes"
"*:${METRIC_NAME_PATTERN}"
+ set_config_key "metrics.reporter.slf4j.filter.includes"
"'*:${METRIC_NAME_PATTERN}'"
}
function get_job_exceptions {
diff --git a/flink-end-to-end-tests/test-scripts/common_yarn_docker.sh
b/flink-end-to-end-tests/test-scripts/common_yarn_docker.sh
index 97fcca09d5a..ec582c05f88 100755
--- a/flink-end-to-end-tests/test-scripts/common_yarn_docker.sh
+++ b/flink-end-to-end-tests/test-scripts/common_yarn_docker.sh
@@ -131,10 +131,10 @@ security.kerberos.login.principal: hadoop-user
slot.request.timeout: 120000
END
)
- docker exec master bash -c "echo \"${FLINK_CONFIG}\" >
/home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml"
+ docker exec master bash -c "echo \"${FLINK_CONFIG}\" >
/home/hadoop-user/${FLINK_DIRNAME}/conf/config.yaml"
echo "Flink config:"
- docker exec master bash -c "cat
/home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml"
+ docker exec master bash -c "cat
/home/hadoop-user/${FLINK_DIRNAME}/conf/config.yaml"
}
function debug_copy_and_show_logs {
diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink.sh
b/flink-end-to-end-tests/test-scripts/test_pyflink.sh
index b9134ebcb4a..a078e84da6b 100755
--- a/flink-end-to-end-tests/test-scripts/test_pyflink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_pyflink.sh
@@ -67,9 +67,13 @@ on_exit test_clean_up
cp -r "${FLINK_DIR}/conf" "${TEST_DATA_DIR}/conf"
-echo "taskmanager.memory.task.off-heap.size: 768m" >>
"${TEST_DATA_DIR}/conf/flink-conf.yaml"
-echo "taskmanager.memory.process.size: 3172m" >>
"${TEST_DATA_DIR}/conf/flink-conf.yaml"
-echo "taskmanager.numberOfTaskSlots: 5" >>
"${TEST_DATA_DIR}/conf/flink-conf.yaml"
+# standard yaml do not allow duplicate keys
+sed -i -e "/^taskmanager.memory.task.off-heap.size: /d"
"${TEST_DATA_DIR}/conf/config.yaml"
+sed -i -e "/^taskmanager.memory.process.size: /d"
"${TEST_DATA_DIR}/conf/config.yaml"
+sed -i -e "/^taskmanager.numberOfTaskSlots: /d"
"${TEST_DATA_DIR}/conf/config.yaml"
+echo "taskmanager.memory.task.off-heap.size: 768m" >>
"${TEST_DATA_DIR}/conf/config.yaml"
+echo "taskmanager.memory.process.size: 3172m" >>
"${TEST_DATA_DIR}/conf/config.yaml"
+echo "taskmanager.numberOfTaskSlots: 5" >> "${TEST_DATA_DIR}/conf/config.yaml"
export FLINK_CONF_DIR="${TEST_DATA_DIR}/conf"
FLINK_PYTHON_DIR=`cd "${CURRENT_DIR}/../../flink-python" && pwd -P`
diff --git
a/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh
b/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh
index 1274a38851c..bb9e456a06d 100755
---
a/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh
+++
b/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh
@@ -81,7 +81,7 @@ if [[ ! "${YARN_APPLICATION_LOGS}" =~ "Receive initial
delegation tokens from re
fi
echo "Running Job without configured keytab, the exception you see below is
expected"
-docker exec master bash -c "echo \"\" >
/home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml"
+docker exec master bash -c "echo \"\" >
/home/hadoop-user/${FLINK_DIRNAME}/conf/config.yaml"
# verify that it doesn't work if we don't configure a keytab
docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \
/home/hadoop-user/${FLINK_DIRNAME}/bin/flink run-application \
diff --git
a/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh
b/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh
index f80e0635224..37e839103a9 100755
--- a/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh
+++ b/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh
@@ -66,7 +66,7 @@ else
fi
echo "Running Job without configured keytab, the exception you see below is
expected"
-docker exec master bash -c "echo \"\" >
/home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml"
+docker exec master bash -c "echo \"\" >
/home/hadoop-user/${FLINK_DIRNAME}/conf/config.yaml"
# verify that it doesn't work if we don't configure a keytab
docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \
/home/hadoop-user/${FLINK_DIRNAME}/bin/flink run \
diff --git
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
index 31e89dacac4..badd86072d8 100644
---
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
+++
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
@@ -112,7 +112,7 @@ public abstract class ParquetVectorizedInputFormat<T,
SplitT extends FileSourceS
final long splitLength = split.length();
// Using Flink FileSystem instead of Hadoop FileSystem directly, so we
can get the hadoop
- // config that create inputFile needed from flink-conf.yaml
+ // config that create inputFile needed from config.yaml
final FileSystem fs = filePath.getFileSystem();
final ParquetInputFile inputFile =
new ParquetInputFile(fs.open(filePath),
fs.getFileStatus(filePath).getLen());
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index d67893e4d21..3a12968663e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -1564,7 +1564,7 @@ public abstract class DataSet<T> {
* dataset.writeAsText("file:///path1"); }</pre>
* <li>A directory is always created when <a
*
href="https://nightlies.apache.org/flink/flink-docs-master/setup/config.html#file-systems">fs.output.always-create-directory</a>
- * is set to true in flink-conf.yaml file, even when parallelism is
set to 1.
+ * is set to true in config.yaml file, even when parallelism is set
to 1.
* <pre>{@code .
* └── path1/
* └── 1 }</pre>
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
index 27382e41be4..8165ed2e5d2 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
@@ -301,7 +301,7 @@ public class KubernetesConfigOptions {
.stringType()
.defaultValue("/opt/flink/conf")
.withDescription(
- "The flink conf directory that will be mounted in
pod. The flink-conf.yaml, log4j.properties, "
+ "The flink conf directory that will be mounted in
pod. The config.yaml, log4j.properties, "
+ "logback.xml in this path will be
overwritten from config map.");
public static final ConfigOption<String> FLINK_LOG_DIR =
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
index 2203770e267..3c3ee9a5154 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
@@ -64,7 +64,7 @@ import static
org.apache.flink.kubernetes.utils.Constants.FLINK_CONF_VOLUME;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * Mounts the log4j.properties, logback.xml, and flink-conf.yaml configuration
on the JobManager or
+ * Mounts the log4j.properties, logback.xml, and config.yaml configuration on
the JobManager or
* TaskManager pod.
*/
public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator {
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
index c6b335449a8..21afbefc287 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
@@ -78,7 +78,7 @@ public interface KubernetesParameters {
*/
List<Map<String, String>> getTolerations();
- /** Directory in Pod that stores the flink-conf.yaml, log4j.properties,
and the logback.xml. */
+ /** Directory in Pod that stores the config.yaml, log4j.properties, and
the logback.xml. */
String getFlinkConfDirInPod();
/** Directory in Pod that saves the log files. */
diff --git a/flink-python/dev/dev-requirements.txt
b/flink-python/dev/dev-requirements.txt
index 2d74afd58c2..f692a63f0f8 100755
--- a/flink-python/dev/dev-requirements.txt
+++ b/flink-python/dev/dev-requirements.txt
@@ -32,3 +32,4 @@ pemja==0.4.1; platform_system != 'Windows'
httplib2>=0.19.0
protobuf>=3.19.0
pytest~=7.0
+pyyaml>=6.0.1
diff --git a/flink-python/pyflink/common/configuration.py
b/flink-python/pyflink/common/configuration.py
index 54e82962f43..35416b8fa47 100644
--- a/flink-python/pyflink/common/configuration.py
+++ b/flink-python/pyflink/common/configuration.py
@@ -69,10 +69,21 @@ class Configuration:
jars_key =
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
classpaths_key =
jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
if key in [jars_key, classpaths_key]:
- add_jars_to_context_class_loader(value.split(";"))
+ jar_urls = Configuration.parse_jars_value(value, jvm)
+ add_jars_to_context_class_loader(jar_urls)
self._j_configuration.setString(key, value)
return self
+ @staticmethod
+ def parse_jars_value(value: str, jvm):
+ is_standard_yaml =
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
+ if is_standard_yaml:
+ import yaml
+ jar_urls_list = yaml.safe_load(value)
+ if isinstance(jar_urls_list, list):
+ return jar_urls_list
+ return value.split(";")
+
def get_integer(self, key: str, default_value: int) -> int:
"""
Returns the value associated with the given key as an integer.
diff --git a/flink-python/pyflink/common/restart_strategy.py
b/flink-python/pyflink/common/restart_strategy.py
index b1c4621bf3b..52962e138e6 100644
--- a/flink-python/pyflink/common/restart_strategy.py
+++ b/flink-python/pyflink/common/restart_strategy.py
@@ -152,7 +152,7 @@ class RestartStrategies(object):
"""
Restart strategy configuration that could be used by jobs to use
cluster level restart
strategy. Useful especially when one has a custom implementation of
restart strategy set via
- flink-conf.yaml.
+ config.yaml.
"""
def __init__(self, j_restart_strategy=None):
diff --git a/flink-python/pyflink/datastream/state_backend.py
b/flink-python/pyflink/datastream/state_backend.py
index 6ffe21912be..7b4c69917bb 100644
--- a/flink-python/pyflink/datastream/state_backend.py
+++ b/flink-python/pyflink/datastream/state_backend.py
@@ -280,7 +280,7 @@ class EmbeddedRocksDBStateBackend(StateBackend):
Sets the predefined options for RocksDB.
If user-configured options within ``RocksDBConfigurableOptions`` is
set (through
- flink-conf.yaml) or a user-defined options factory is set (via
:func:`setOptions`),
+ config.yaml) or a user-defined options factory is set (via
:func:`setOptions`),
then the options from the factory are applied on top of the here
specified
predefined options and customized options.
@@ -301,7 +301,7 @@ class EmbeddedRocksDBStateBackend(StateBackend):
are :data:`PredefinedOptions.DEFAULT`.
If user-configured options within ``RocksDBConfigurableOptions`` is
set (through
- flink-conf.yaml) or a user-defined options factory is set (via
:func:`setOptions`),
+ config.yaml) or a user-defined options factory is set (via
:func:`setOptions`),
then the options from the factory are applied on top of the predefined
and customized
options.
@@ -320,7 +320,7 @@ class EmbeddedRocksDBStateBackend(StateBackend):
The options created by the factory here are applied on top of the
pre-defined
options profile selected via :func:`set_predefined_options` and
user-configured
- options from configuration set through flink-conf.yaml with keys in
+ options from configuration set through config.yaml with keys in
``RocksDBConfigurableOptions``.
:param options_factory_class_name: The fully-qualified class name of
the options
@@ -383,7 +383,7 @@ class MemoryStateBackend(StateBackend):
>> env.set_state_backend(HashMapStateBackend())
>>
env.get_checkpoint_config().set_checkpoint_storage(JobManagerCheckpointStorage())
- If you are configuring your state backend via the `flink-conf.yaml` please
make the following
+ If you are configuring your state backend via the `config.yaml` please
make the following
changes.
```
@@ -535,7 +535,7 @@ class FsStateBackend(StateBackend):
>> env.set_state_backend(HashMapStateBackend())
>>
env.get_checkpoint_config().set_checkpoint_storage("hdfs://checkpoints")
- If you are configuring your state backend via the `flink-conf.yaml` please
set your state
+ If you are configuring your state backend via the `config.yaml` please set
your state
backend type to `hashmap`.
This state backend holds the working state in the memory (JVM heap) of the
TaskManagers.
@@ -717,7 +717,7 @@ class RocksDBStateBackend(StateBackend):
>> env.set_state_backend(EmbeddedRocksDBStateBackend())
>>
env.get_checkpoint_config().set_checkpoint_storage("hdfs://checkpoints")
- If you are configuring your state backend via the `flink-conf.yaml` no
changes are required.
+ If you are configuring your state backend via the `config.yaml` no changes
are required.
A State Backend that stores its state in ``RocksDB``. This state backend
can
store very large state that exceeds memory and spills to disk.
@@ -862,7 +862,7 @@ class RocksDBStateBackend(StateBackend):
Sets the predefined options for RocksDB.
If user-configured options within ``RocksDBConfigurableOptions`` is
set (through
- flink-conf.yaml) or a user-defined options factory is set (via
:func:`setOptions`),
+ config.yaml) or a user-defined options factory is set (via
:func:`setOptions`),
then the options from the factory are applied on top of the here
specified
predefined options and customized options.
@@ -882,7 +882,7 @@ class RocksDBStateBackend(StateBackend):
are :data:`PredefinedOptions.DEFAULT`.
If user-configured options within ``RocksDBConfigurableOptions`` is
set (through
- flink-conf.yaml) or a user-defined options factory is set (via
:func:`setOptions`),
+ config.yaml) or a user-defined options factory is set (via
:func:`setOptions`),
then the options from the factory are applied on top of the predefined
and customized
options.
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py
b/flink-python/pyflink/datastream/stream_execution_environment.py
index d1b7ce41118..20fcf85d7ed 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -842,7 +842,7 @@ class StreamExecutionEnvironment(object):
method returns a local execution environment.
When executed from the command line the given configuration is stacked
on top of the
- global configuration which comes from the flink-conf.yaml, potentially
overriding
+ global configuration which comes from the config.yaml, potentially
overriding
duplicated options.
:param configuration: The configuration to instantiate the environment
with.
diff --git a/flink-python/pyflink/pyflink_gateway_server.py
b/flink-python/pyflink/pyflink_gateway_server.py
index dcb3f34884e..09b77231ef7 100644
--- a/flink-python/pyflink/pyflink_gateway_server.py
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -43,29 +43,43 @@ def on_windows():
return platform.system() == "Windows"
-def read_from_config(key, default_value, flink_conf_file):
- value = default_value
- # get the realpath of tainted path value to avoid CWE22 problem that
constructs a path or URI
- # using the tainted value and might allow an attacker to access, modify,
or test the existence
- # of critical or sensitive files.
- with open(os.path.realpath(flink_conf_file), "r") as f:
- while True:
- line = f.readline()
- if not line:
- break
- if line.startswith("#") or len(line.strip()) == 0:
- continue
- k, v = line.split(":", 1)
- if k.strip() == key:
- value = v.strip()
- return value
+def read_from_config(key, default_value, flink_conf_directory):
+ import yaml
+ # try to find flink-conf.yaml file in flink_conf_directory
+ flink_conf_file = os.path.join(flink_conf_directory, "flink-conf.yaml")
+ if os.path.isfile(flink_conf_file):
+ # If flink-conf.yaml exists, use the old parsing logic to read the
value
+ # get the realpath of tainted path value to avoid CWE22 problem that
constructs a path
+ # or URI using the tainted value and might allow an attacker to
access, modify, or test
+ # the existence of critical or sensitive files.
+ with open(os.path.realpath(flink_conf_file), "r") as f:
+ while True:
+ line = f.readline()
+ if not line:
+ break
+ if line.startswith("#") or len(line.strip()) == 0:
+ continue
+ k, v = line.split(":", 1)
+ if k.strip() == key:
+ return v.strip()
+ else:
+ # If flink-conf.yaml does not exist, try to find config.yaml instead
+ config_file = os.path.join(flink_conf_directory, "config.yaml")
+ if os.path.isfile(config_file):
+ # If config.yaml exists, use YAML parser to read the value
+ with open(os.path.realpath(config_file), "r") as f:
+ config = yaml.safe_load(f)
+ return config.get(key, default_value)
+
+ # If neither file exists, return the default value
+ return default_value
def find_java_executable():
java_executable = "java.exe" if on_windows() else "java"
flink_home = _find_flink_home()
- flink_conf_file = os.path.join(flink_home, "conf", "flink-conf.yaml")
- java_home = read_from_config(KEY_ENV_JAVA_HOME, None, flink_conf_file)
+ flink_conf_directory = os.path.join(flink_home, "conf")
+ java_home = read_from_config(KEY_ENV_JAVA_HOME, None, flink_conf_directory)
if java_home is None and "JAVA_HOME" in os.environ:
java_home = os.environ["JAVA_HOME"]
@@ -120,13 +134,12 @@ def construct_log_settings(env):
flink_home = os.path.realpath(_find_flink_home())
flink_conf_dir = env['FLINK_CONF_DIR']
- flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
if "FLINK_LOG_DIR" in env:
flink_log_dir = env["FLINK_LOG_DIR"]
else:
flink_log_dir = read_from_config(
- KEY_ENV_LOG_DIR, os.path.join(flink_home, "log"), flink_conf_file)
+ KEY_ENV_LOG_DIR, os.path.join(flink_home, "log"),
env['FLINK_CONF_DIR'])
if "LOG4J_PROPERTIES" in env:
log4j_properties = env["LOG4J_PROPERTIES"]
@@ -156,14 +169,13 @@ def construct_log_settings(env):
def get_jvm_opts(env):
- flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
jvm_opts = env.get("FLINK_ENV_JAVA_OPTS")
if jvm_opts is None:
- default_jvm_opts = read_from_config(KEY_ENV_JAVA_DEFAULT_OPTS, "",
flink_conf_file)
+ default_jvm_opts = read_from_config(KEY_ENV_JAVA_DEFAULT_OPTS, "",
env['FLINK_CONF_DIR'])
extra_jvm_opts = read_from_config(
KEY_ENV_JAVA_OPTS,
- read_from_config(KEY_ENV_JAVA_OPTS_DEPRECATED, "",
flink_conf_file),
- flink_conf_file)
+ read_from_config(KEY_ENV_JAVA_OPTS_DEPRECATED, "",
env['FLINK_CONF_DIR']),
+ env['FLINK_CONF_DIR'])
jvm_opts = default_jvm_opts + " " + extra_jvm_opts
# Remove leading and trailing double quotes (if present) of value
@@ -194,8 +206,6 @@ def construct_flink_classpath(env):
def construct_hadoop_classpath(env):
- flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
-
hadoop_conf_dir = ""
if 'HADOOP_CONF_DIR' not in env and 'HADOOP_CLASSPATH' not in env:
if os.path.isdir("/etc/hadoop/conf"):
@@ -212,11 +222,11 @@ def construct_hadoop_classpath(env):
return os.pathsep.join(
[env.get("HADOOP_CLASSPATH", ""),
env.get("YARN_CONF_DIR",
- read_from_config(KEY_ENV_YARN_CONF_DIR, "", flink_conf_file)),
+ read_from_config(KEY_ENV_YARN_CONF_DIR, "",
env['FLINK_CONF_DIR'])),
env.get("HADOOP_CONF_DIR",
- read_from_config(KEY_ENV_HADOOP_CONF_DIR, hadoop_conf_dir,
flink_conf_file)),
+ read_from_config(KEY_ENV_HADOOP_CONF_DIR, hadoop_conf_dir,
env['FLINK_CONF_DIR'])),
env.get("HBASE_CONF_DIR",
- read_from_config(KEY_ENV_HBASE_CONF_DIR, hbase_conf_dir,
flink_conf_file))])
+ read_from_config(KEY_ENV_HBASE_CONF_DIR, hbase_conf_dir,
env['FLINK_CONF_DIR']))])
def construct_test_classpath(env):
diff --git a/flink-python/pyflink/table/table_config.py
b/flink-python/pyflink/table/table_config.py
index bd12a447823..ba17767c776 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -36,7 +36,7 @@ class TableConfig(object):
This class is a pure API class that abstracts configuration from various
sources. Currently,
configuration can be set in any of the following layers (in the given
order):
- - flink-conf.yaml
+ - config.yaml
- CLI parameters
- :class:`~pyflink.datastream.StreamExecutionEnvironment` when bridging to
DataStream API
- :func:`~EnvironmentSettings.Builder.with_configuration`
@@ -106,8 +106,8 @@ class TableConfig(object):
jars_key =
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
classpaths_key =
jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
if key in [jars_key, classpaths_key]:
- add_jars_to_context_class_loader(value.split(";"))
-
+ jar_urls = Configuration.parse_jars_value(value, jvm)
+ add_jars_to_context_class_loader(jar_urls)
return self
def get_local_timezone(self) -> str:
diff --git a/flink-python/pyflink/table/table_environment.py
b/flink-python/pyflink/table/table_environment.py
index 3327261ac41..ebfcc7cb095 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -1540,23 +1540,33 @@ class TableEnvironment(object):
j_config.setString(jvm.PythonOptions.PYTHON_EXECUTABLE.key(),
sys.executable)
def _add_jars_to_j_env_config(self, config_key):
- jvm = get_gateway().jvm
jar_urls = self.get_config().get(config_key, None)
- if jar_urls is not None:
- # normalize
+
+ if jar_urls:
+ jvm = get_gateway().jvm
jar_urls_list = []
- for url in jar_urls.split(";"):
- url = url.strip()
- if url != "":
- jar_urls_list.append(jvm.java.net.URL(url).toString())
+ parsed_jar_urls = Configuration.parse_jars_value(jar_urls, jvm)
+ url_strings = [
+ jvm.java.net.URL(url).toString() if url else ""
+ for url in parsed_jar_urls
+ ]
+ self._parse_urls(url_strings, jar_urls_list)
+
j_configuration = get_j_env_configuration(self._get_j_env())
- if j_configuration.containsKey(config_key):
- for url in j_configuration.getString(config_key,
"").split(";"):
- url = url.strip()
- if url != "" and url not in jar_urls_list:
- jar_urls_list.append(url)
+ parsed_jar_urls = Configuration.parse_jars_value(
+ j_configuration.getString(config_key, ""),
+ jvm
+ )
+ self._parse_urls(parsed_jar_urls, jar_urls_list)
+
j_configuration.setString(config_key, ";".join(jar_urls_list))
+ def _parse_urls(self, jar_urls, jar_urls_list):
+ for url in jar_urls:
+ url = url.strip()
+ if url != "" and url not in jar_urls_list:
+ jar_urls_list.append(url)
+
def _get_j_env(self):
return self._j_tenv.getPlanner().getExecEnv()
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 7161354be9f..24637fe4c7d 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -325,7 +325,9 @@ try:
'pandas>=1.3.0',
'pyarrow>=5.0.0',
'pemja==0.4.1;platform_system != "Windows"',
- 'httplib2>=0.19.0', apache_flink_libraries_dependency]
+ 'httplib2>=0.19.0',
+ 'pyyaml>=6.0.1',
+ apache_flink_libraries_dependency]
setup(
name='apache-flink',
diff --git
a/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
b/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
index d7959c425e9..c58267e6fc9 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java
@@ -48,7 +48,7 @@ public class PythonConfig implements ReadableConfig {
}
/**
- * Configuration adopted from the outer layer, e.g. flink-conf.yaml,
command line arguments,
+ * Configuration adopted from the outer layer, e.g. config.yaml, command
line arguments,
* TableConfig, etc.
*/
private final ReadableConfig configuration;
diff --git
a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index 17f425cebb8..142457bd19c 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -182,7 +182,7 @@ public class PythonOptions {
.text("2. the command line option
\"-pyclientexec\";")
.linebreak()
.text(
- "3. the configuration
'python.client.executable' defined in flink-conf.yaml")
+ "3. the configuration
'python.client.executable' defined in config.yaml")
.linebreak()
.text("4. the environment variable
PYFLINK_CLIENT_EXECUTABLE;")
.build());
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java
index d0185c5f0ee..8ea11a65ba3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java
@@ -46,8 +46,8 @@ public final class RestartBackoffTimeStrategyFactoryLoader {
* <li>Strategy set within job graph, i.e. {@link
* RestartStrategies.RestartStrategyConfiguration}, unless the
config is {@link
* RestartStrategies.FallbackRestartStrategyConfiguration}.
- * <li>Strategy set in the cluster(server-side) config
(flink-conf.yaml), unless the strategy
- * is not specified
+ * <li>Strategy set in the cluster(server-side) config (config.yaml),
unless the strategy is
+ * not specified
* <li>{@link
*
FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory}
if
* checkpointing is enabled. Otherwise {@link
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
index ea4f007449b..df4809714ef 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
@@ -79,7 +79,7 @@ public class CheckpointStorageLoader {
if (logger != null) {
logger.debug(
"The configuration {} has not be set in the current"
- + " sessions flink-conf.yaml. Falling back to
a default CheckpointStorage"
+ + " sessions config.yaml. Falling back to a
default CheckpointStorage"
+ " type. Users are strongly encouraged
explicitly set this configuration"
+ " so they understand how their applications
are checkpointing"
+ " snapshots for fault-tolerance.",
@@ -138,7 +138,7 @@ public class CheckpointStorageLoader {
* StreamExecutionEnvironment}.
*
* <p>3) Use the {@link CheckpointStorage} instance configured via the
clusters
- * <b>flink-conf.yaml</b>.
+ * <b>config.yaml</b>.
*
* <p>4) Load a default {@link CheckpointStorage} instance.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index aded04d7b6b..ec219432faa 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -67,7 +67,7 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
*
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints");
* }</pre>
*
- * <p>If you are configuring your state backend via the {@code
flink-conf.yaml} please make the
+ * <p>If you are configuring your state backend via the {@code config.yaml}
please make the
* following changes set your state backend type to "hashmap" {@code
state.backend.type: hashmap}.
*
* <p>This state backend holds the working state in the memory (JVM heap) of
the TaskManagers. The
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 87d8d77803e..6345c2b4602 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -59,7 +59,7 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
* env.getCheckpointConfig().setCheckpointStorage(new
JobManagerCheckpointStorage());
* }</pre>
*
- * <p>If you are configuring your state backend via the {@code
flink-conf.yaml} please make the
+ * <p>If you are configuring your state backend via the {@code config.yaml}
please make the
* following changes:
*
* <pre>{@code
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
index 048515b5b44..6d15c53c1a2 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
@@ -698,7 +698,7 @@ public class EmbeddedRocksDBStateBackend extends
AbstractManagedMemoryStateBacke
* Sets the predefined options for RocksDB.
*
* <p>If user-configured options within {@link RocksDBConfigurableOptions}
is set (through
- * flink-conf.yaml) or a user-defined options factory is set (via {@link
+ * config.yaml) or a user-defined options factory is set (via {@link
* #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the
factory are applied on
* top of the here specified predefined options and customized options.
*
@@ -714,7 +714,7 @@ public class EmbeddedRocksDBStateBackend extends
AbstractManagedMemoryStateBacke
* PredefinedOptions#DEFAULT}.
*
* <p>If user-configured options within {@link RocksDBConfigurableOptions}
is set (through
- * flink-conf.yaml) of a user-defined options factory is set (via {@link
+ * config.yaml) of a user-defined options factory is set (via {@link
* #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the
factory are applied on
* top of the predefined and customized options.
*
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
index 69576520fc3..e78d9fb0eb5 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -145,7 +145,7 @@ public class RocksDBOptions {
+ "This option only takes effect
if neither '%s' nor '%s' are not configured. If none is configured "
+ "then each RocksDB column family
state has its own memory caches (as controlled by the column "
+ "family options). "
- + "The relevant options for the
shared resources (e.g. write-buffer-ratio) can be set on the same level
(flink-conf.yaml)."
+ + "The relevant options for the
shared resources (e.g. write-buffer-ratio) can be set on the same level
(config.yaml)."
+ "Note, that this feature breaks
resource isolation between the slots",
USE_MANAGED_MEMORY.key(),
FIX_PER_SLOT_MEMORY_SIZE.key()));
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 0c37a1e6bc5..dd3fcb70cf4 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -61,8 +61,7 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
*
env.getCheckpointConfig().setCheckpointStorage("hdfs://checkpoints");
* }</pre>
*
- * <p>If you are configuring your state backend via the {@code
flink-conf.yaml} no changes are
- * required.
+ * <p>If you are configuring your state backend via the {@code config.yaml} no
changes are required.
*
* <p>A State Backend that stores its state in {@code RocksDB}. This state
backend can store very
* large state that exceeds memory and spills to disk.
@@ -399,7 +398,7 @@ public class RocksDBStateBackend extends
AbstractManagedMemoryStateBackend
* Sets the predefined options for RocksDB.
*
* <p>If user-configured options within {@link RocksDBConfigurableOptions}
is set (through
- * flink-conf.yaml) or a user-defined options factory is set (via {@link
+ * config.yaml) or a user-defined options factory is set (via {@link
* #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the
factory are applied on
* top of the here specified predefined options and customized options.
*
@@ -415,7 +414,7 @@ public class RocksDBStateBackend extends
AbstractManagedMemoryStateBackend
* PredefinedOptions#DEFAULT}.
*
* <p>If user-configured options within {@link RocksDBConfigurableOptions}
is set (through
- * flink-conf.yaml) of a user-defined options factory is set (via {@link
+ * config.yaml) of a user-defined options factory is set (via {@link
* #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the
factory are applied on
* top of the predefined and customized options.
*
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 82a42dfdb3c..34d27e1bb59 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -249,7 +249,7 @@ public class RocksDBStateBackendConfigTest {
env.close();
}
- /** Validates that user custom configuration from code should override the
flink-conf.yaml. */
+ /** Validates that user custom configuration from code should override the
config.yaml. */
@Test
public void testConfigureTimerServiceLoadingFromApplication() throws
Exception {
final MockEnvironment env = new MockEnvironmentBuilder().build();
@@ -524,7 +524,7 @@ public class RocksDBStateBackendConfigTest {
// verify that we would use PredefinedOptions.DEFAULT by default.
assertEquals(PredefinedOptions.DEFAULT,
rocksDbBackend.getPredefinedOptions());
- // verify that user could configure predefined options via
flink-conf.yaml
+ // verify that user could configure predefined options via config.yaml
Configuration configuration = new Configuration();
configuration.set(
RocksDBOptions.PREDEFINED_OPTIONS,
PredefinedOptions.FLASH_SSD_OPTIMIZED.name());
@@ -640,7 +640,7 @@ public class RocksDBStateBackendConfigTest {
String checkpointPath = tempFolder.newFolder().toURI().toString();
RocksDBStateBackend rocksDbBackend = new
RocksDBStateBackend(checkpointPath);
- // verify that user-defined options factory could be configured via
flink-conf.yaml
+ // verify that user-defined options factory could be configured via
config.yaml
Configuration config = new Configuration();
config.setString(RocksDBOptions.OPTIONS_FACTORY.key(),
TestOptionsFactory.class.getName());
config.setString(TestOptionsFactory.BACKGROUND_JOBS_OPTION.key(), "4");
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index f14f3d4400a..b179ead220d 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -2591,7 +2591,7 @@ public class StreamExecutionEnvironment implements
AutoCloseable {
* execution environment, as returned by {@link
#createLocalEnvironment(Configuration)}.
*
* <p>When executed from the command line the given configuration is
stacked on top of the
- * global configuration which comes from the {@code flink-conf.yaml},
potentially overriding
+ * global configuration which comes from the {@code config.yaml},
potentially overriding
* duplicated options.
*
* @param configuration The configuration to instantiate the environment
with.
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
index 551e05ef962..eae416cd0a6 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
@@ -127,8 +127,8 @@ public class DefaultContext {
//
-------------------------------------------------------------------------------------------
/**
- * Build the {@link DefaultContext} from flink-conf.yaml, dynamic
configuration and users
- * specified jars.
+ * Build the {@link DefaultContext} from config.yaml, dynamic
configuration and users specified
+ * jars.
*
* @param dynamicConfig user specified configuration.
* @param dependencies user specified jars
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java
index 0c3e3ede7a6..65daeb4a3a8 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java
@@ -43,7 +43,7 @@ public class SqlGatewayRestEndpointTestUtils {
return rebuildRestEndpointOptions(context.getEndpointOptions());
}
- /** Create the configuration generated from flink-conf.yaml. */
+ /** Create the configuration generated from config.yaml. */
public static Configuration getFlinkConfig(
String address, String bindAddress, String portRange) {
final Configuration config = new Configuration();
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
index e5d22ed943e..8ca455d31b4 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java
@@ -184,7 +184,7 @@ class SessionContextTest {
CatalogListener1.class.getName(),
CatalogListener2.class.getName()));
- // Find and create listeners from flink-conf.yaml for session
+ // Find and create listeners from config.yaml for session
flinkConfig.set(
TableConfigOptions.TABLE_CATALOG_MODIFICATION_LISTENERS,
Arrays.asList(CatalogFactory1.IDENTIFIER,
CatalogFactory2.IDENTIFIER));
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
index f80ad6393ec..3d3ffe29cca 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
@@ -50,7 +50,7 @@ import static
org.apache.flink.table.api.internal.TableConfigValidation.validate
* configuration can be set in any of the following layers (in the given
order):
*
* <ol>
- * <li>{@code flink-conf.yaml},
+ * <li>{@code config.yaml},
* <li>CLI parameters,
* <li>{@code StreamExecutionEnvironment} when bridging to DataStream API,
* <li>{@link EnvironmentSettings.Builder#withConfiguration(Configuration)}
/ {@link
@@ -103,7 +103,7 @@ public final class TableConfig implements WritableConfig,
ReadableConfig {
// Note to implementers:
// TableConfig is a ReadableConfig which is built once the
TableEnvironment is created and
- // contains both the configuration defined in the execution context
(flink-conf.yaml + CLI
+ // contains both the configuration defined in the execution context
(config.yaml + CLI
// params), stored in rootConfiguration, but also any extra configuration
defined by the user in
// the application, which has precedence over the execution configuration.
//
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainersSettings.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainersSettings.java
index cd2bee6b39c..f3c79c4b35d 100644
---
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainersSettings.java
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainersSettings.java
@@ -283,8 +283,8 @@ public class FlinkContainersSettings {
}
/**
- * Sets a single Flink configuration parameter (the options for
flink-conf.yaml) and returns
- * a reference to this Builder enabling method chaining.
+ * Sets a single Flink configuration parameter (the options for
config.yaml) and returns a
+ * reference to this Builder enabling method chaining.
*
* @param <T> The type parameter.
* @param option The option.
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java
index 38b542e1310..1ee0c9f6999 100644
---
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java
@@ -125,7 +125,7 @@ public class FlinkImageBuilder {
}
/**
- * Sets Flink configuration. This configuration will be used for
generating flink-conf.yaml for
+ * Sets Flink configuration. This configuration will be used for
generating config.yaml for
* configuring JobManager and TaskManager.
*/
public FlinkImageBuilder setConfiguration(Configuration conf) {
@@ -209,12 +209,10 @@ public class FlinkImageBuilder {
final Path flinkConfFile = createTemporaryFlinkConfFile(conf,
tempDirectory);
final Path log4jPropertiesFile =
createTemporaryLog4jPropertiesFile(tempDirectory);
- // Copy flink-conf.yaml into image
- // NOTE: Before we change the default conf file in the flink-dist
to 'config.yaml', we
- // need to use the legacy flink conf file 'flink-conf.yaml' here.
+ // Copy config.yaml into image
filesToCopy.put(
flinkConfFile,
- Paths.get(flinkHome, "conf",
GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME));
+ Paths.get(flinkHome, "conf",
GlobalConfiguration.FLINK_CONF_FILENAME));
filesToCopy.put(
log4jPropertiesFile, Paths.get(flinkHome, "conf",
LOG4J_PROPERTIES_FILENAME));
@@ -292,13 +290,10 @@ public class FlinkImageBuilder {
private Path createTemporaryFlinkConfFile(Configuration
finalConfiguration, Path tempDirectory)
throws IOException {
- // Create a temporary flink-conf.yaml file and write merged
configurations into it
- // NOTE: Before we change the default conf file in the flink-dist to
'config.yaml', we
- // need to use the legacy flink conf file 'flink-conf.yaml' here.
- Path flinkConfFile =
tempDirectory.resolve(GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME);
+ Path flinkConfFile =
tempDirectory.resolve(GlobalConfiguration.FLINK_CONF_FILENAME);
Files.write(
flinkConfFile,
-
ConfigurationUtils.convertConfigToWritableLines(finalConfiguration, true));
+
ConfigurationUtils.convertConfigToWritableLines(finalConfiguration, false));
return flinkConfFile;
}
diff --git
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index fcf32ea13be..ec7c6905ee7 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -821,10 +821,9 @@ public abstract class YarnTestBase {
File flinkConfDirPath =
TestUtils.findFile(
- // NOTE: Before we change the default conf file in
the flink-dist to
- // 'config.yaml', we need to use the legacy flink
conf file
- // 'flink-conf.yaml' here.
- flinkDistRootDir, new ContainsName(new String[]
{"flink-conf.yaml"}));
+ flinkDistRootDir,
+ new ContainsName(
+ new String[]
{GlobalConfiguration.FLINK_CONF_FILENAME}));
assertThat(flinkConfDirPath).isNotNull();
final String confDirPath =
flinkConfDirPath.getParentFile().getAbsolutePath();
@@ -839,7 +838,8 @@ public abstract class YarnTestBase {
FileUtils.copyDirectory(new File(confDirPath),
tempConfPathForSecureRun);
BootstrapTools.writeConfiguration(
- globalConfiguration, new File(tempConfPathForSecureRun,
"flink-conf.yaml"));
+ globalConfiguration,
+ new File(tempConfPathForSecureRun,
GlobalConfiguration.FLINK_CONF_FILENAME));
String configDir = tempConfPathForSecureRun.getAbsolutePath();
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 4f81077b16e..5d9c77695e0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -485,7 +485,7 @@ public final class Utils {
*
* @param flinkConfig The Flink configuration.
* @param tmParams Parameters for the task manager.
- * @param configDirectory The configuration directory for the
flink-conf.yaml
+ * @param configDirectory The configuration directory for the config.yaml
* @param logDirectory The log directory.
* @param hasLogback Uses logback?
* @param hasLog4j Uses log4j?