This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 9721ce835f5 [FLINK-33577][dist] Change the default config file to config.yaml in flink-dist. 9721ce835f5 is described below commit 9721ce835f5a7f28f2ad187346e009633307097b Author: JunRuiLee <jrlee....@gmail.com> AuthorDate: Wed Aug 23 22:48:30 2023 +0800 [FLINK-33577][dist] Change the default config file to config.yaml in flink-dist. This closes #24177. --- .../generated/kubernetes_config_configuration.html | 2 +- .../shortcodes/generated/python_configuration.html | 2 +- .../generated/rocksdb_configuration.html | 2 +- .../generated/state_backend_rocksdb_section.html | 2 +- .../api/java/hadoop/mapred/utils/HadoopUtils.java | 2 +- .../common/restartstrategy/RestartStrategies.java | 2 +- .../flink/configuration/ConfigConstants.java | 4 +- .../configuration/ResourceManagerOptions.java | 2 +- flink-dist/src/main/assemblies/bin.xml | 2 +- flink-dist/src/main/flink-bin/bin/config.sh | 8 +- flink-dist/src/main/resources/config.yaml | 298 ++++++++++++++++++++ flink-dist/src/main/resources/flink-conf.yaml | 311 --------------------- .../org/apache/flink/dist/BashJavaUtilsITCase.java | 51 +++- .../flink/tests/util/flink/FlinkDistribution.java | 6 +- flink-end-to-end-tests/test-scripts/common.sh | 18 +- .../test-scripts/common_yarn_docker.sh | 4 +- .../test-scripts/test_pyflink.sh | 10 +- .../test_yarn_application_kerberos_docker.sh | 2 +- .../test-scripts/test_yarn_job_kerberos_docker.sh | 2 +- .../parquet/ParquetVectorizedInputFormat.java | 2 +- .../java/org/apache/flink/api/java/DataSet.java | 2 +- .../configuration/KubernetesConfigOptions.java | 2 +- .../decorators/FlinkConfMountDecorator.java | 2 +- .../parameters/KubernetesParameters.java | 2 +- flink-python/dev/dev-requirements.txt | 1 + flink-python/pyflink/common/configuration.py | 13 +- flink-python/pyflink/common/restart_strategy.py | 2 +- flink-python/pyflink/datastream/state_backend.py | 16 +- .../datastream/stream_execution_environment.py | 2 +- flink-python/pyflink/pyflink_gateway_server.py | 68 +++-- flink-python/pyflink/table/table_config.py | 6 +- flink-python/pyflink/table/table_environment.py | 34 ++- flink-python/setup.py | 4 +- .../java/org/apache/flink/python/PythonConfig.java | 2 +- .../org/apache/flink/python/PythonOptions.java | 2 +- .../RestartBackoffTimeStrategyFactoryLoader.java | 4 +- .../runtime/state/CheckpointStorageLoader.java | 4 +- .../runtime/state/filesystem/FsStateBackend.java | 2 +- .../runtime/state/memory/MemoryStateBackend.java | 2 +- .../state/EmbeddedRocksDBStateBackend.java | 4 +- .../contrib/streaming/state/RocksDBOptions.java | 2 +- .../streaming/state/RocksDBStateBackend.java | 7 +- .../state/RocksDBStateBackendConfigTest.java | 6 +- .../environment/StreamExecutionEnvironment.java | 2 +- .../gateway/service/context/DefaultContext.java | 4 +- .../rest/util/SqlGatewayRestEndpointTestUtils.java | 2 +- .../service/context/SessionContextTest.java | 2 +- .../org/apache/flink/table/api/TableConfig.java | 4 +- .../container/FlinkContainersSettings.java | 4 +- .../testframe/container/FlinkImageBuilder.java | 15 +- .../java/org/apache/flink/yarn/YarnTestBase.java | 10 +- .../src/main/java/org/apache/flink/yarn/Utils.java | 2 +- 52 files changed, 510 insertions(+), 456 deletions(-) diff --git a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html index 7bbdfd5e404..86d147f6c00 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html @@ -90,7 +90,7 @@ <td><h5>kubernetes.flink.conf.dir</h5></td> <td style="word-wrap: break-word;">"/opt/flink/conf"</td> <td>String</td> - <td>The flink conf directory that will be mounted in pod. The flink-conf.yaml, log4j.properties, logback.xml in this path will be overwritten from config map.</td> + <td>The flink conf directory that will be mounted in pod. The config.yaml, log4j.properties, logback.xml in this path will be overwritten from config map.</td> </tr> <tr> <td><h5>kubernetes.flink.log.dir</h5></td> diff --git a/docs/layouts/shortcodes/generated/python_configuration.html b/docs/layouts/shortcodes/generated/python_configuration.html index 60ef6a9676e..d99c1f2a3b9 100644 --- a/docs/layouts/shortcodes/generated/python_configuration.html +++ b/docs/layouts/shortcodes/generated/python_configuration.html @@ -18,7 +18,7 @@ <td><h5>python.client.executable</h5></td> <td style="word-wrap: break-word;">"python"</td> <td>String</td> - <td>The path of the Python interpreter used to launch the Python process when submitting the Python jobs via "flink run" or compiling the Java/Scala jobs containing Python UDFs. Equivalent to the command line option "-pyclientexec" or the environment variable PYFLINK_CLIENT_EXECUTABLE. The priority is as following: <br />1. the configuration 'python.client.executable' defined in the source code(Only used in Flink Java SQL/Table API job call Python UDF);<br />2. the command li [...] + <td>The path of the Python interpreter used to launch the Python process when submitting the Python jobs via "flink run" or compiling the Java/Scala jobs containing Python UDFs. Equivalent to the command line option "-pyclientexec" or the environment variable PYFLINK_CLIENT_EXECUTABLE. The priority is as following: <br />1. the configuration 'python.client.executable' defined in the source code(Only used in Flink Java SQL/Table API job call Python UDF);<br />2. the command li [...] </tr> <tr> <td><h5>python.executable</h5></td> diff --git a/docs/layouts/shortcodes/generated/rocksdb_configuration.html b/docs/layouts/shortcodes/generated/rocksdb_configuration.html index 5cd25cda3f8..b6ad67234e0 100644 --- a/docs/layouts/shortcodes/generated/rocksdb_configuration.html +++ b/docs/layouts/shortcodes/generated/rocksdb_configuration.html @@ -30,7 +30,7 @@ <td><h5>state.backend.rocksdb.memory.fixed-per-tm</h5></td> <td style="word-wrap: break-word;">(none)</td> <td>MemorySize</td> - <td>The fixed total amount of memory, shared among all RocksDB instances per Task Manager (cluster-level option). This option only takes effect if neither 'state.backend.rocksdb.memory.managed' nor 'state.backend.rocksdb.memory.fixed-per-slot' are not configured. If none is configured then each RocksDB column family state has its own memory caches (as controlled by the column family options). The relevant options for the shared resources (e.g. write-buffer-ratio) can be set o [...] + <td>The fixed total amount of memory, shared among all RocksDB instances per Task Manager (cluster-level option). This option only takes effect if neither 'state.backend.rocksdb.memory.managed' nor 'state.backend.rocksdb.memory.fixed-per-slot' are not configured. If none is configured then each RocksDB column family state has its own memory caches (as controlled by the column family options). The relevant options for the shared resources (e.g. write-buffer-ratio) can be set o [...] </tr> <tr> <td><h5>state.backend.rocksdb.memory.high-prio-pool-ratio</h5></td> diff --git a/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html b/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html index 2dd92d7c0e0..7d917bb7ae8 100644 --- a/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html +++ b/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html @@ -18,7 +18,7 @@ <td><h5>state.backend.rocksdb.memory.fixed-per-tm</h5></td> <td style="word-wrap: break-word;">(none)</td> <td>MemorySize</td> - <td>The fixed total amount of memory, shared among all RocksDB instances per Task Manager (cluster-level option). This option only takes effect if neither 'state.backend.rocksdb.memory.managed' nor 'state.backend.rocksdb.memory.fixed-per-slot' are not configured. If none is configured then each RocksDB column family state has its own memory caches (as controlled by the column family options). The relevant options for the shared resources (e.g. write-buffer-ratio) can be set o [...] + <td>The fixed total amount of memory, shared among all RocksDB instances per Task Manager (cluster-level option). This option only takes effect if neither 'state.backend.rocksdb.memory.managed' nor 'state.backend.rocksdb.memory.fixed-per-slot' are not configured. If none is configured then each RocksDB column family state has its own memory caches (as controlled by the column family options). The relevant options for the shared resources (e.g. write-buffer-ratio) can be set o [...] </tr> <tr> <td><h5>state.backend.rocksdb.memory.high-prio-pool-ratio</h5></td> diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java index 0cf31d5cabc..67a53405d24 100644 --- a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java +++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java @@ -55,7 +55,7 @@ public final class HadoopUtils { /** * Returns a new Hadoop Configuration object using the path to the hadoop conf configured in the - * main configuration (flink-conf.yaml). This method is public because its being used in the + * main configuration (config.yaml). This method is public because its being used in the * HadoopDataSource. * * @param flinkConfiguration Flink configuration object diff --git a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java index 191d89d957f..b099d944892 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java @@ -503,7 +503,7 @@ public class RestartStrategies { /** * Restart strategy configuration that could be used by jobs to use cluster level restart * strategy. Useful especially when one has a custom implementation of restart strategy set via - * flink-conf.yaml. + * config.yaml. */ @PublicEvolving public static final class FallbackRestartStrategyConfiguration diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index ad7857af06d..b7751abd07b 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -391,7 +391,7 @@ public final class ConfigConstants { /** * Prefix for passing custom environment variables to Flink's master process. For example for * passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: - * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the flink-conf.yaml. + * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the config.yaml. * * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_MASTER_ENV_PREFIX} instead. */ @@ -482,7 +482,7 @@ public final class ConfigConstants { /** * Prefix for passing custom environment variables to Flink's ApplicationMaster (JobManager). * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: - * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the flink-conf.yaml. + * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the config.yaml. * * @deprecated Please use {@code CONTAINERIZED_MASTER_ENV_PREFIX}. */ diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java index e846e0a114e..10283e68657 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java @@ -276,7 +276,7 @@ public class ResourceManagerOptions { /** * Prefix for passing custom environment variables to Flink's master process. For example for * passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: - * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the flink-conf.yaml. + * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" in the config.yaml. */ public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env."; diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml index 9753e1db003..3ac51f4562f 100644 --- a/flink-dist/src/main/assemblies/bin.xml +++ b/flink-dist/src/main/assemblies/bin.xml @@ -123,7 +123,7 @@ under the License. <!-- copy the config file --> <file> - <source>src/main/resources/flink-conf.yaml</source> + <source>src/main/resources/config.yaml</source> <outputDirectory>conf</outputDirectory> <fileMode>0644</fileMode> </file> diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index d90e4362f7e..2959f9c5ece 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -112,12 +112,12 @@ readFromConfig() { } ######################################################################################################################## -# DEFAULT CONFIG VALUES: These values will be used when nothing has been specified in conf/flink-conf.yaml +# DEFAULT CONFIG VALUES: These values will be used when nothing has been specified in conf/config.yaml # -or- the respective environment variables are not set. ######################################################################################################################## # WARNING !!! , these values are only used if there is nothing else is specified in -# conf/flink-conf.yaml +# conf/config.yaml DEFAULT_ENV_PID_DIR="/tmp" # Directory to store *.pid files to DEFAULT_ENV_LOG_MAX=10 # Maximum number of old log files to keep @@ -134,7 +134,7 @@ DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Direc DEFAULT_HBASE_CONF_DIR="" # HBase Configuration Directory, if necessary ######################################################################################################################## -# CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml +# CONFIG KEYS: The default values can be overwritten by the following keys in conf/config.yaml ######################################################################################################################## KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa" @@ -351,7 +351,7 @@ if [ -z "${HIGH_AVAILABILITY}" ]; then fi # Arguments for the JVM. Used for job and task manager JVMs. -# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys +# DO NOT USE FOR MEMORY SETTINGS! Use conf/config.yaml with keys # JobManagerOptions#TOTAL_PROCESS_MEMORY and TaskManagerOptions#TOTAL_PROCESS_MEMORY for that! if [ -z "${JVM_ARGS}" ]; then JVM_ARGS="" diff --git a/flink-dist/src/main/resources/config.yaml b/flink-dist/src/main/resources/config.yaml new file mode 100644 index 00000000000..08aace171ea --- /dev/null +++ b/flink-dist/src/main/resources/config.yaml @@ -0,0 +1,298 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# These parameters are required for Java 17 support. +# They can be safely removed when using Java 8/11. +env: + java: + opts: + all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNA [...] + +#============================================================================== +# Common +#============================================================================== + +jobmanager: + # The host interface the JobManager will bind to. By default, this is localhost, and will prevent + # the JobManager from communicating outside the machine/container it is running on. + # On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0. + # On Kubernetes this setting will be ignored, defaulting to 0.0.0.0. + # + # To enable this, set the bind-host address to one that has access to an outside facing network + # interface, such as 0.0.0.0. + bind-host: localhost + rpc: + # The external address of the host on which the JobManager runs and can be + # reached by the TaskManagers and any clients which want to connect. This setting + # is only used in Standalone mode and may be overwritten on the JobManager side + # by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable. + # In high availability mode, if you use the bin/start-cluster.sh script and setup + # the conf/masters file, this will be taken care of automatically. Yarn + # automatically configure the host name based on the hostname of the node where the + # JobManager runs. + address: localhost + # The RPC port where the JobManager is reachable. + port: 6123 + memory: + process: + # The total process memory size for the JobManager. + # Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead. + size: 1600m + execution: + # The failover strategy, i.e., how the job computation recovers from task failures. + # Only restart tasks that may have been affected by the task failure, which typically includes + # downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption. + failover-strategy: region + +taskmanager: + # The host interface the TaskManager will bind to. By default, this is localhost, and will prevent + # the TaskManager from communicating outside the machine/container it is running on. + # On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0. + # On Kubernetes this setting will be ignored, defaulting to 0.0.0.0. + # + # To enable this, set the bind-host address to one that has access to an outside facing network + # interface, such as 0.0.0.0. + bind-host: localhost + # The address of the host on which the TaskManager runs and can be reached by the JobManager and + # other TaskManagers. If not specified, the TaskManager will try different strategies to identify + # the address. + # + # Note this address needs to be reachable by the JobManager and forward traffic to one of + # the interfaces the TaskManager is bound to (see 'taskmanager.bind-host'). + # + # Note also that unless all TaskManagers are running on the same machine, this address needs to be + # configured separately for each TaskManager. + host: localhost + # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. + numberOfTaskSlots: 1 + memory: + process: + # The total process memory size for the TaskManager. + # + # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead. + # To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'. + # It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory. + size: 1728m + +parallelism: + # The parallelism used for programs that did not specify and other parallelism. + default: 1 + +# # The default file system scheme and authority. +# # By default file paths without scheme are interpreted relative to the local +# # root file system 'file:///'. Use this to override the default and interpret +# # relative paths relative to a different file system, +# # for example 'hdfs://mynamenode:12345' +# fs: +# default-scheme: hdfs://mynamenode:12345 + +#============================================================================== +# High Availability +#============================================================================== + +# high-availability: +# # The high-availability mode. Possible options are 'NONE' or 'zookeeper'. +# type: zookeeper +# # The path where metadata for master recovery is persisted. While ZooKeeper stores +# # the small ground truth for checkpoint and leader election, this location stores +# # the larger objects, like persisted dataflow graphs. +# # +# # Must be a durable file system that is accessible from all nodes +# # (like HDFS, S3, Ceph, nfs, ...) +# storageDir: hdfs:///flink/ha/ +# zookeeper: +# # The list of ZooKeeper quorum peers that coordinate the high-availability +# # setup. This must be a list of the form: +# # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) +# quorum: localhost:2181 +# client: +# # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes +# # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE) +# # The default value is "open" and it can be changed to "creator" if ZK security is enabled +# acl: open + +#============================================================================== +# Fault tolerance and checkpointing +#============================================================================== + +# The backend that will be used to store operator state checkpoints if +# checkpointing is enabled. Checkpointing is enabled when execution.checkpointing.interval > 0. + +# # Execution checkpointing related parameters. Please refer to CheckpointConfig and ExecutionCheckpointingOptions for more details. +# execution: +# checkpointing: +# interval: 3min +# externalized-checkpoint-retention: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION] +# max-concurrent-checkpoints: 1 +# min-pause: 0 +# mode: [EXACTLY_ONCE, AT_LEAST_ONCE] +# timeout: 10min +# tolerable-failed-checkpoints: 0 +# unaligned: false + +# state: +# backend: +# # Supported backends are 'hashmap', 'rocksdb', or the +# # <class-name-of-factory>. +# type: hashmap +# # Flag to enable/disable incremental checkpoints for backends that +# # support incremental checkpoints (like the RocksDB state backend). +# incremental: false +# checkpoints: +# # Directory for checkpoints filesystem, when using any of the default bundled +# # state backends. +# dir: hdfs://namenode-host:port/flink-checkpoints +# savepoints: +# # Default target directory for savepoints, optional. +# dir: hdfs://namenode-host:port/flink-savepoints + +#============================================================================== +# Rest & web frontend +#============================================================================== + +rest: + # The address to which the REST client will connect to + address: localhost + # The address that the REST & web server binds to + # By default, this is localhost, which prevents the REST & web server from + # being able to communicate outside of the machine/container it is running on. + # + # To enable this, set the bind address to one that has access to outside-facing + # network interface, such as 0.0.0.0. + bind-address: localhost + # # The port to which the REST client connects to. If rest.bind-port has + # # not been specified, then the server will bind to this port as well. + # port: 8081 + # # Port range for the REST and web server to bind to. + # bind-port: 8080-8090 + +# web: +# submit: +# # Flag to specify whether job submission is enabled from the web-based +# # runtime monitor. Uncomment to disable. +# enable: false +# cancel: +# # Flag to specify whether job cancellation is enabled from the web-based +# # runtime monitor. Uncomment to disable. +# enable: false + +#============================================================================== +# Advanced +#============================================================================== + +# io: +# tmp: +# # Override the directories for temporary files. If not specified, the +# # system-specific Java temporary directory (java.io.tmpdir property) is taken. +# # +# # For framework setups on Yarn, Flink will automatically pick up the +# # containers' temp directories without any need for configuration. +# # +# # Add a delimited list for multiple directories, using the system directory +# # delimiter (colon ':' on unix) or a comma, e.g.: +# # /data1/tmp:/data2/tmp:/data3/tmp +# # +# # Note: Each directory entry is read from and written to by a different I/O +# # thread. You can include the same directory multiple times in order to create +# # multiple I/O threads against that directory. This is for example relevant for +# # high-throughput RAIDs. +# dirs: /tmp + +# classloader: +# resolve: +# # The classloading resolve order. Possible values are 'child-first' (Flink's default) +# # and 'parent-first' (Java's default). +# # +# # Child first classloading allows users to use different dependency/library +# # versions in their application than those in the classpath. Switching back +# # to 'parent-first' may help with debugging dependency issues. +# order: child-first + +# The amount of memory going to the network stack. These numbers usually need +# no tuning. Adjusting them may be necessary in case of an "Insufficient number +# of network buffers" error. The default min is 64MB, the default max is 1GB. +# +# taskmanager: +# memory: +# network: +# fraction: 0.1 +# min: 64mb +# max: 1gb + +#============================================================================== +# Flink Cluster Security Configuration +#============================================================================== + +# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors - +# may be enabled in four steps: +# 1. configure the local krb5.conf file +# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit) +# 3. make the credentials available to various JAAS login contexts +# 4. configure the connector to use JAAS/SASL + +# # The below configure how Kerberos credentials are provided. A keytab will be used instead of +# # a ticket cache if the keytab path and principal are set. +# security: +# kerberos: +# login: +# use-ticket-cache: true +# keytab: /path/to/kerberos/keytab +# principal: flink-user +# # The configuration below defines which JAAS login contexts +# contexts: Client,KafkaClient + +#============================================================================== +# ZK Security Configuration +#============================================================================== + +# zookeeper: +# sasl: +# # Below configurations are applicable if ZK ensemble is configured for security +# # +# # Override below configuration to provide custom ZK service name if configured +# # zookeeper.sasl.service-name: zookeeper +# # +# # The configuration below must match one of the values set in "security.kerberos.login.contexts" +# login-context-name: Client + +#============================================================================== +# HistoryServer +#============================================================================== + +# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop) +# +# jobmanager: +# archive: +# fs: +# # Directory to upload completed jobs to. Add this directory to the list of +# # monitored directories of the HistoryServer as well (see below). +# dir: hdfs:///completed-jobs/ + +# historyserver: +# web: +# # The address under which the web-based HistoryServer listens. +# address: 0.0.0.0 +# # The port under which the web-based HistoryServer listens. +# port: 8082 +# archive: +# fs: +# # Comma separated list of directories to monitor for completed jobs. +# dir: hdfs:///completed-jobs/ +# # Interval in milliseconds for refreshing the monitored directories. +# fs.refresh-interval: 10000 + diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml deleted file mode 100644 index b5aa2794dd9..00000000000 --- a/flink-dist/src/main/resources/flink-conf.yaml +++ /dev/null @@ -1,311 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# These parameters are required for Java 17 support. -# They can be safely removed when using Java 8/11. -env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5= [...] - -#============================================================================== -# Common -#============================================================================== - -# The external address of the host on which the JobManager runs and can be -# reached by the TaskManagers and any clients which want to connect. This setting -# is only used in Standalone mode and may be overwritten on the JobManager side -# by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable. -# In high availability mode, if you use the bin/start-cluster.sh script and setup -# the conf/masters file, this will be taken care of automatically. Yarn -# automatically configure the host name based on the hostname of the node where the -# JobManager runs. - -jobmanager.rpc.address: localhost - -# The RPC port where the JobManager is reachable. - -jobmanager.rpc.port: 6123 - -# The host interface the JobManager will bind to. By default, this is localhost, and will prevent -# the JobManager from communicating outside the machine/container it is running on. -# On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0. -# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0. -# -# To enable this, set the bind-host address to one that has access to an outside facing network -# interface, such as 0.0.0.0. - -jobmanager.bind-host: localhost - - -# The total process memory size for the JobManager. -# -# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead. - -jobmanager.memory.process.size: 1600m - -# The host interface the TaskManager will bind to. By default, this is localhost, and will prevent -# the TaskManager from communicating outside the machine/container it is running on. -# On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0. -# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0. -# -# To enable this, set the bind-host address to one that has access to an outside facing network -# interface, such as 0.0.0.0. - -taskmanager.bind-host: localhost - -# The address of the host on which the TaskManager runs and can be reached by the JobManager and -# other TaskManagers. If not specified, the TaskManager will try different strategies to identify -# the address. -# -# Note this address needs to be reachable by the JobManager and forward traffic to one of -# the interfaces the TaskManager is bound to (see 'taskmanager.bind-host'). -# -# Note also that unless all TaskManagers are running on the same machine, this address needs to be -# configured separately for each TaskManager. - -taskmanager.host: localhost - -# The total process memory size for the TaskManager. -# -# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead. - -taskmanager.memory.process.size: 1728m - -# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'. -# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory. -# -# taskmanager.memory.flink.size: 1280m - -# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. - -taskmanager.numberOfTaskSlots: 1 - -# The parallelism used for programs that did not specify and other parallelism. - -parallelism.default: 1 - -# The default file system scheme and authority. -# -# By default file paths without scheme are interpreted relative to the local -# root file system 'file:///'. Use this to override the default and interpret -# relative paths relative to a different file system, -# for example 'hdfs://mynamenode:12345' -# -# fs.default-scheme - -#============================================================================== -# High Availability -#============================================================================== - -# The high-availability mode. Possible options are 'NONE' or 'zookeeper'. -# -# high-availability.type: zookeeper - -# The path where metadata for master recovery is persisted. While ZooKeeper stores -# the small ground truth for checkpoint and leader election, this location stores -# the larger objects, like persisted dataflow graphs. -# -# Must be a durable file system that is accessible from all nodes -# (like HDFS, S3, Ceph, nfs, ...) -# -# high-availability.storageDir: hdfs:///flink/ha/ - -# The list of ZooKeeper quorum peers that coordinate the high-availability -# setup. This must be a list of the form: -# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) -# -# high-availability.zookeeper.quorum: localhost:2181 - - -# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes -# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE) -# The default value is "open" and it can be changed to "creator" if ZK security is enabled -# -# high-availability.zookeeper.client.acl: open - -#============================================================================== -# Fault tolerance and checkpointing -#============================================================================== - -# The backend that will be used to store operator state checkpoints if -# checkpointing is enabled. Checkpointing is enabled when execution.checkpointing.interval > 0. -# -# Execution checkpointing related parameters. Please refer to CheckpointConfig and ExecutionCheckpointingOptions for more details. -# -# execution.checkpointing.interval: 3min -# execution.checkpointing.externalized-checkpoint-retention: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION] -# execution.checkpointing.max-concurrent-checkpoints: 1 -# execution.checkpointing.min-pause: 0 -# execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE] -# execution.checkpointing.timeout: 10min -# execution.checkpointing.tolerable-failed-checkpoints: 0 -# execution.checkpointing.unaligned: false -# -# Supported backends are 'hashmap', 'rocksdb', or the -# <class-name-of-factory>. -# -# state.backend.type: hashmap - -# Directory for checkpoints filesystem, when using any of the default bundled -# state backends. -# -# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints - -# Default target directory for savepoints, optional. -# -# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints - -# Flag to enable/disable incremental checkpoints for backends that -# support incremental checkpoints (like the RocksDB state backend). -# -# state.backend.incremental: false - -# The failover strategy, i.e., how the job computation recovers from task failures. -# Only restart tasks that may have been affected by the task failure, which typically includes -# downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption. - -jobmanager.execution.failover-strategy: region - -#============================================================================== -# Rest & web frontend -#============================================================================== - -# The port to which the REST client connects to. If rest.bind-port has -# not been specified, then the server will bind to this port as well. -# -#rest.port: 8081 - -# The address to which the REST client will connect to -# -rest.address: localhost - -# Port range for the REST and web server to bind to. -# -#rest.bind-port: 8080-8090 - -# The address that the REST & web server binds to -# By default, this is localhost, which prevents the REST & web server from -# being able to communicate outside of the machine/container it is running on. -# -# To enable this, set the bind address to one that has access to outside-facing -# network interface, such as 0.0.0.0. -# -rest.bind-address: localhost - -# Flag to specify whether job submission is enabled from the web-based -# runtime monitor. Uncomment to disable. - -#web.submit.enable: false - -# Flag to specify whether job cancellation is enabled from the web-based -# runtime monitor. Uncomment to disable. - -#web.cancel.enable: false - -#============================================================================== -# Advanced -#============================================================================== - -# Override the directories for temporary files. If not specified, the -# system-specific Java temporary directory (java.io.tmpdir property) is taken. -# -# For framework setups on Yarn, Flink will automatically pick up the -# containers' temp directories without any need for configuration. -# -# Add a delimited list for multiple directories, using the system directory -# delimiter (colon ':' on unix) or a comma, e.g.: -# /data1/tmp:/data2/tmp:/data3/tmp -# -# Note: Each directory entry is read from and written to by a different I/O -# thread. You can include the same directory multiple times in order to create -# multiple I/O threads against that directory. This is for example relevant for -# high-throughput RAIDs. -# -# io.tmp.dirs: /tmp - -# The classloading resolve order. Possible values are 'child-first' (Flink's default) -# and 'parent-first' (Java's default). -# -# Child first classloading allows users to use different dependency/library -# versions in their application than those in the classpath. Switching back -# to 'parent-first' may help with debugging dependency issues. -# -# classloader.resolve-order: child-first - -# The amount of memory going to the network stack. These numbers usually need -# no tuning. Adjusting them may be necessary in case of an "Insufficient number -# of network buffers" error. The default min is 64MB, the default max is 1GB. -# -# taskmanager.memory.network.fraction: 0.1 -# taskmanager.memory.network.min: 64mb -# taskmanager.memory.network.max: 1gb - -#============================================================================== -# Flink Cluster Security Configuration -#============================================================================== - -# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors - -# may be enabled in four steps: -# 1. configure the local krb5.conf file -# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit) -# 3. make the credentials available to various JAAS login contexts -# 4. configure the connector to use JAAS/SASL - -# The below configure how Kerberos credentials are provided. A keytab will be used instead of -# a ticket cache if the keytab path and principal are set. - -# security.kerberos.login.use-ticket-cache: true -# security.kerberos.login.keytab: /path/to/kerberos/keytab -# security.kerberos.login.principal: flink-user - -# The configuration below defines which JAAS login contexts - -# security.kerberos.login.contexts: Client,KafkaClient - -#============================================================================== -# ZK Security Configuration -#============================================================================== - -# Below configurations are applicable if ZK ensemble is configured for security - -# Override below configuration to provide custom ZK service name if configured -# zookeeper.sasl.service-name: zookeeper - -# The configuration below must match one of the values set in "security.kerberos.login.contexts" -# zookeeper.sasl.login-context-name: Client - -#============================================================================== -# HistoryServer -#============================================================================== - -# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop) - -# Directory to upload completed jobs to. Add this directory to the list of -# monitored directories of the HistoryServer as well (see below). -#jobmanager.archive.fs.dir: hdfs:///completed-jobs/ - -# The address under which the web-based HistoryServer listens. -#historyserver.web.address: 0.0.0.0 - -# The port under which the web-based HistoryServer listens. -#historyserver.web.port: 8082 - -# Comma separated list of directories to monitor for completed jobs. -#historyserver.archive.fs.dir: hdfs:///completed-jobs/ - -# Interval in milliseconds for refreshing the monitored directories. -#historyserver.archive.fs.refresh-interval: 10000 - diff --git a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java index 744ae1dccdd..df86e666a20 100644 --- a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java +++ b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java @@ -18,15 +18,24 @@ package org.apache.flink.dist; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.util.bash.BashJavaUtils; +import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.shaded.guava32.com.google.common.collect.Sets; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -49,6 +58,8 @@ class BashJavaUtilsITCase extends JavaBashTestBase { private static final String RUN_EXTRACT_LOGGING_OUTPUTS_SCRIPT = "src/test/bin/runExtractLoggingOutputs.sh"; + @TempDir private Path tmpDir; + @Test void testGetTmResourceParamsConfigs() throws Exception { int expectedResultLines = 2; @@ -122,7 +133,7 @@ class BashJavaUtilsITCase extends JavaBashTestBase { @Test void testGetConfiguration() throws Exception { - int expectedResultLines = 13; + int expectedResultLines = 26; String[] commands = { RUN_BASH_JAVA_UTILS_CMD_SCRIPT, BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), @@ -135,7 +146,7 @@ class BashJavaUtilsITCase extends JavaBashTestBase { @Test void testGetConfigurationRemoveKey() throws Exception { - int expectedResultLines = 12; + int expectedResultLines = 24; String[] commands = { RUN_BASH_JAVA_UTILS_CMD_SCRIPT, BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), @@ -146,12 +157,13 @@ class BashJavaUtilsITCase extends JavaBashTestBase { List<String> lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); assertThat(lines).hasSize(expectedResultLines); - assertThat(lines).doesNotContain("parallelism.default: 1"); + Configuration configuration = loadConfiguration(lines); + assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).isEmpty(); } @Test void testGetConfigurationRemoveKeyValue() throws Exception { - int expectedResultLines = 12; + int expectedResultLines = 24; String[] commands = { RUN_BASH_JAVA_UTILS_CMD_SCRIPT, BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), @@ -162,12 +174,13 @@ class BashJavaUtilsITCase extends JavaBashTestBase { List<String> lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); assertThat(lines).hasSize(expectedResultLines); - assertThat(lines).doesNotContain("parallelism.default: 1"); + Configuration configuration = loadConfiguration(lines); + assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).isEmpty(); } @Test void testGetConfigurationRemoveKeyValueNotMatchingValue() throws Exception { - int expectedResultLines = 13; + int expectedResultLines = 26; String[] commands = { RUN_BASH_JAVA_UTILS_CMD_SCRIPT, BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), @@ -178,12 +191,13 @@ class BashJavaUtilsITCase extends JavaBashTestBase { List<String> lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); assertThat(lines).hasSize(expectedResultLines); - assertThat(lines).contains("parallelism.default: 1"); + Configuration configuration = loadConfiguration(lines); + assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).hasValue(1); } @Test void testGetConfigurationReplaceKeyValue() throws Exception { - int expectedResultLines = 13; + int expectedResultLines = 26; String[] commands = { RUN_BASH_JAVA_UTILS_CMD_SCRIPT, BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), @@ -194,13 +208,13 @@ class BashJavaUtilsITCase extends JavaBashTestBase { List<String> lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); assertThat(lines).hasSize(expectedResultLines); - assertThat(lines).doesNotContain("parallelism.default: 1"); - assertThat(lines).contains("parallelism.default: 2"); + Configuration configuration = loadConfiguration(lines); + assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).hasValue(2); } @Test void testGetConfigurationReplaceKeyValueNotMatchingValue() throws Exception { - int expectedResultLines = 13; + int expectedResultLines = 26; String[] commands = { RUN_BASH_JAVA_UTILS_CMD_SCRIPT, BashJavaUtils.Command.UPDATE_AND_GET_FLINK_CONFIGURATION.toString(), @@ -211,7 +225,8 @@ class BashJavaUtilsITCase extends JavaBashTestBase { List<String> lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); assertThat(lines).hasSize(expectedResultLines); - assertThat(lines).doesNotContain("parallelism.default: 3"); + Configuration configuration = loadConfiguration(lines); + assertThat(configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM)).hasValue(1); } private static Map<String, String> parseAndAssertDynamicParameters( @@ -274,4 +289,16 @@ class BashJavaUtilsITCase extends JavaBashTestBase { assertThat(actualOutput).isEqualTo(expectedOutput); } + + private Configuration loadConfiguration(List<String> lines) throws IOException { + File file = + TempDirUtils.newFile( + tmpDir.toAbsolutePath(), GlobalConfiguration.FLINK_CONF_FILENAME); + try (final PrintWriter pw = new PrintWriter(file)) { + for (String line : lines) { + pw.println(line); + } + } + return GlobalConfiguration.loadConfiguration(tmpDir.toAbsolutePath().toString()); + } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java index 5969f0850d9..92c9986e394 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java @@ -418,11 +418,9 @@ public final class FlinkDistribution { mergedConfig.addAll(defaultConfig); mergedConfig.addAll(config); - // NOTE: Before we change the default conf file in the flink-dist to 'config.yaml', we - // need to use the legacy flink conf file 'flink-conf.yaml' here. Files.write( - conf.resolve(GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME), - ConfigurationUtils.convertConfigToWritableLines(mergedConfig, true)); + conf.resolve(GlobalConfiguration.FLINK_CONF_FILENAME), + ConfigurationUtils.convertConfigToWritableLines(mergedConfig, false)); } public void setTaskExecutorHosts(Collection<String> taskExecutorHosts) throws IOException { diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh old mode 100644 new mode 100755 index a7c4ce5a68a..d3452b4b1bb --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -50,6 +50,16 @@ TEST_INFRA_DIR=`pwd -P` cd $TEST_ROOT source "${TEST_INFRA_DIR}/common_utils.sh" +source "${FLINK_DIR}/bin/bash-java-utils.sh" + +if [[ -z "${FLINK_CONF_DIR:-}" ]]; then + FLINK_CONF_DIR="$FLINK_DIR/conf" +fi +FLINK_CONF=${FLINK_CONF_DIR}/config.yaml +# Flatten the configuration file config.yaml to enable end-to-end test cases which will modify +# it directly through shell scripts. +output=$(updateAndGetFlinkConfiguration "${FLINK_CONF_DIR}" "${FLINK_DIR}/bin" "${FLINK_DIR}/lib" -flatten) +echo "$output" > $FLINK_CONF NODENAME=${NODENAME:-"localhost"} @@ -143,14 +153,14 @@ function swap_planner_scala_with_planner_loader() { function delete_config_key() { local config_key=$1 - sed -i -e "/^${config_key}: /d" ${FLINK_DIR}/conf/flink-conf.yaml + sed -i -e "/^${config_key}: /d" $FLINK_CONF } function set_config_key() { local config_key=$1 local value=$2 delete_config_key ${config_key} - echo "$config_key: $value" >> $FLINK_DIR/conf/flink-conf.yaml + echo "$config_key: $value" >> $FLINK_CONF } function create_ha_config() { @@ -166,7 +176,7 @@ function create_ha_config() { # This must have all the masters to be used in HA. echo "localhost:8081" > ${FLINK_DIR}/conf/masters - # then move on to create the flink-conf.yaml + # then move on to create the config.yaml #============================================================================== # Common #============================================================================== @@ -688,7 +698,7 @@ function setup_flink_slf4j_metric_reporter() { METRIC_NAME_PATTERN="${1:-"*"}" set_config_key "metrics.reporter.slf4j.factory.class" "org.apache.flink.metrics.slf4j.Slf4jReporterFactory" set_config_key "metrics.reporter.slf4j.interval" "1 SECONDS" - set_config_key "metrics.reporter.slf4j.filter.includes" "*:${METRIC_NAME_PATTERN}" + set_config_key "metrics.reporter.slf4j.filter.includes" "'*:${METRIC_NAME_PATTERN}'" } function get_job_exceptions { diff --git a/flink-end-to-end-tests/test-scripts/common_yarn_docker.sh b/flink-end-to-end-tests/test-scripts/common_yarn_docker.sh index 97fcca09d5a..ec582c05f88 100755 --- a/flink-end-to-end-tests/test-scripts/common_yarn_docker.sh +++ b/flink-end-to-end-tests/test-scripts/common_yarn_docker.sh @@ -131,10 +131,10 @@ security.kerberos.login.principal: hadoop-user slot.request.timeout: 120000 END ) - docker exec master bash -c "echo \"${FLINK_CONFIG}\" > /home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml" + docker exec master bash -c "echo \"${FLINK_CONFIG}\" > /home/hadoop-user/${FLINK_DIRNAME}/conf/config.yaml" echo "Flink config:" - docker exec master bash -c "cat /home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml" + docker exec master bash -c "cat /home/hadoop-user/${FLINK_DIRNAME}/conf/config.yaml" } function debug_copy_and_show_logs { diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink.sh b/flink-end-to-end-tests/test-scripts/test_pyflink.sh index b9134ebcb4a..a078e84da6b 100755 --- a/flink-end-to-end-tests/test-scripts/test_pyflink.sh +++ b/flink-end-to-end-tests/test-scripts/test_pyflink.sh @@ -67,9 +67,13 @@ on_exit test_clean_up cp -r "${FLINK_DIR}/conf" "${TEST_DATA_DIR}/conf" -echo "taskmanager.memory.task.off-heap.size: 768m" >> "${TEST_DATA_DIR}/conf/flink-conf.yaml" -echo "taskmanager.memory.process.size: 3172m" >> "${TEST_DATA_DIR}/conf/flink-conf.yaml" -echo "taskmanager.numberOfTaskSlots: 5" >> "${TEST_DATA_DIR}/conf/flink-conf.yaml" +# standard yaml do not allow duplicate keys +sed -i -e "/^taskmanager.memory.task.off-heap.size: /d" "${TEST_DATA_DIR}/conf/config.yaml" +sed -i -e "/^taskmanager.memory.process.size: /d" "${TEST_DATA_DIR}/conf/config.yaml" +sed -i -e "/^taskmanager.numberOfTaskSlots: /d" "${TEST_DATA_DIR}/conf/config.yaml" +echo "taskmanager.memory.task.off-heap.size: 768m" >> "${TEST_DATA_DIR}/conf/config.yaml" +echo "taskmanager.memory.process.size: 3172m" >> "${TEST_DATA_DIR}/conf/config.yaml" +echo "taskmanager.numberOfTaskSlots: 5" >> "${TEST_DATA_DIR}/conf/config.yaml" export FLINK_CONF_DIR="${TEST_DATA_DIR}/conf" FLINK_PYTHON_DIR=`cd "${CURRENT_DIR}/../../flink-python" && pwd -P` diff --git a/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh b/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh index 1274a38851c..bb9e456a06d 100755 --- a/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh +++ b/flink-end-to-end-tests/test-scripts/test_yarn_application_kerberos_docker.sh @@ -81,7 +81,7 @@ if [[ ! "${YARN_APPLICATION_LOGS}" =~ "Receive initial delegation tokens from re fi echo "Running Job without configured keytab, the exception you see below is expected" -docker exec master bash -c "echo \"\" > /home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml" +docker exec master bash -c "echo \"\" > /home/hadoop-user/${FLINK_DIRNAME}/conf/config.yaml" # verify that it doesn't work if we don't configure a keytab docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \ /home/hadoop-user/${FLINK_DIRNAME}/bin/flink run-application \ diff --git a/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh b/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh index f80e0635224..37e839103a9 100755 --- a/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh +++ b/flink-end-to-end-tests/test-scripts/test_yarn_job_kerberos_docker.sh @@ -66,7 +66,7 @@ else fi echo "Running Job without configured keytab, the exception you see below is expected" -docker exec master bash -c "echo \"\" > /home/hadoop-user/${FLINK_DIRNAME}/conf/flink-conf.yaml" +docker exec master bash -c "echo \"\" > /home/hadoop-user/${FLINK_DIRNAME}/conf/config.yaml" # verify that it doesn't work if we don't configure a keytab docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \ /home/hadoop-user/${FLINK_DIRNAME}/bin/flink run \ diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java index 31e89dacac4..badd86072d8 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java @@ -112,7 +112,7 @@ public abstract class ParquetVectorizedInputFormat<T, SplitT extends FileSourceS final long splitLength = split.length(); // Using Flink FileSystem instead of Hadoop FileSystem directly, so we can get the hadoop - // config that create inputFile needed from flink-conf.yaml + // config that create inputFile needed from config.yaml final FileSystem fs = filePath.getFileSystem(); final ParquetInputFile inputFile = new ParquetInputFile(fs.open(filePath), fs.getFileStatus(filePath).getLen()); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index d67893e4d21..3a12968663e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -1564,7 +1564,7 @@ public abstract class DataSet<T> { * dataset.writeAsText("file:///path1"); }</pre> * <li>A directory is always created when <a * href="https://nightlies.apache.org/flink/flink-docs-master/setup/config.html#file-systems">fs.output.always-create-directory</a> - * is set to true in flink-conf.yaml file, even when parallelism is set to 1. + * is set to true in config.yaml file, even when parallelism is set to 1. * <pre>{@code . * └── path1/ * └── 1 }</pre> diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java index 27382e41be4..8165ed2e5d2 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java @@ -301,7 +301,7 @@ public class KubernetesConfigOptions { .stringType() .defaultValue("/opt/flink/conf") .withDescription( - "The flink conf directory that will be mounted in pod. The flink-conf.yaml, log4j.properties, " + "The flink conf directory that will be mounted in pod. The config.yaml, log4j.properties, " + "logback.xml in this path will be overwritten from config map."); public static final ConfigOption<String> FLINK_LOG_DIR = diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java index 2203770e267..3c3ee9a5154 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java @@ -64,7 +64,7 @@ import static org.apache.flink.kubernetes.utils.Constants.FLINK_CONF_VOLUME; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Mounts the log4j.properties, logback.xml, and flink-conf.yaml configuration on the JobManager or + * Mounts the log4j.properties, logback.xml, and config.yaml configuration on the JobManager or * TaskManager pod. */ public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator { diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java index c6b335449a8..21afbefc287 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java @@ -78,7 +78,7 @@ public interface KubernetesParameters { */ List<Map<String, String>> getTolerations(); - /** Directory in Pod that stores the flink-conf.yaml, log4j.properties, and the logback.xml. */ + /** Directory in Pod that stores the config.yaml, log4j.properties, and the logback.xml. */ String getFlinkConfDirInPod(); /** Directory in Pod that saves the log files. */ diff --git a/flink-python/dev/dev-requirements.txt b/flink-python/dev/dev-requirements.txt index 2d74afd58c2..f692a63f0f8 100755 --- a/flink-python/dev/dev-requirements.txt +++ b/flink-python/dev/dev-requirements.txt @@ -32,3 +32,4 @@ pemja==0.4.1; platform_system != 'Windows' httplib2>=0.19.0 protobuf>=3.19.0 pytest~=7.0 +pyyaml>=6.0.1 diff --git a/flink-python/pyflink/common/configuration.py b/flink-python/pyflink/common/configuration.py index 54e82962f43..35416b8fa47 100644 --- a/flink-python/pyflink/common/configuration.py +++ b/flink-python/pyflink/common/configuration.py @@ -69,10 +69,21 @@ class Configuration: jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key() classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key() if key in [jars_key, classpaths_key]: - add_jars_to_context_class_loader(value.split(";")) + jar_urls = Configuration.parse_jars_value(value, jvm) + add_jars_to_context_class_loader(jar_urls) self._j_configuration.setString(key, value) return self + @staticmethod + def parse_jars_value(value: str, jvm): + is_standard_yaml = jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml() + if is_standard_yaml: + import yaml + jar_urls_list = yaml.safe_load(value) + if isinstance(jar_urls_list, list): + return jar_urls_list + return value.split(";") + def get_integer(self, key: str, default_value: int) -> int: """ Returns the value associated with the given key as an integer. diff --git a/flink-python/pyflink/common/restart_strategy.py b/flink-python/pyflink/common/restart_strategy.py index b1c4621bf3b..52962e138e6 100644 --- a/flink-python/pyflink/common/restart_strategy.py +++ b/flink-python/pyflink/common/restart_strategy.py @@ -152,7 +152,7 @@ class RestartStrategies(object): """ Restart strategy configuration that could be used by jobs to use cluster level restart strategy. Useful especially when one has a custom implementation of restart strategy set via - flink-conf.yaml. + config.yaml. """ def __init__(self, j_restart_strategy=None): diff --git a/flink-python/pyflink/datastream/state_backend.py b/flink-python/pyflink/datastream/state_backend.py index 6ffe21912be..7b4c69917bb 100644 --- a/flink-python/pyflink/datastream/state_backend.py +++ b/flink-python/pyflink/datastream/state_backend.py @@ -280,7 +280,7 @@ class EmbeddedRocksDBStateBackend(StateBackend): Sets the predefined options for RocksDB. If user-configured options within ``RocksDBConfigurableOptions`` is set (through - flink-conf.yaml) or a user-defined options factory is set (via :func:`setOptions`), + config.yaml) or a user-defined options factory is set (via :func:`setOptions`), then the options from the factory are applied on top of the here specified predefined options and customized options. @@ -301,7 +301,7 @@ class EmbeddedRocksDBStateBackend(StateBackend): are :data:`PredefinedOptions.DEFAULT`. If user-configured options within ``RocksDBConfigurableOptions`` is set (through - flink-conf.yaml) or a user-defined options factory is set (via :func:`setOptions`), + config.yaml) or a user-defined options factory is set (via :func:`setOptions`), then the options from the factory are applied on top of the predefined and customized options. @@ -320,7 +320,7 @@ class EmbeddedRocksDBStateBackend(StateBackend): The options created by the factory here are applied on top of the pre-defined options profile selected via :func:`set_predefined_options` and user-configured - options from configuration set through flink-conf.yaml with keys in + options from configuration set through config.yaml with keys in ``RocksDBConfigurableOptions``. :param options_factory_class_name: The fully-qualified class name of the options @@ -383,7 +383,7 @@ class MemoryStateBackend(StateBackend): >> env.set_state_backend(HashMapStateBackend()) >> env.get_checkpoint_config().set_checkpoint_storage(JobManagerCheckpointStorage()) - If you are configuring your state backend via the `flink-conf.yaml` please make the following + If you are configuring your state backend via the `config.yaml` please make the following changes. ``` @@ -535,7 +535,7 @@ class FsStateBackend(StateBackend): >> env.set_state_backend(HashMapStateBackend()) >> env.get_checkpoint_config().set_checkpoint_storage("hdfs://checkpoints") - If you are configuring your state backend via the `flink-conf.yaml` please set your state + If you are configuring your state backend via the `config.yaml` please set your state backend type to `hashmap`. This state backend holds the working state in the memory (JVM heap) of the TaskManagers. @@ -717,7 +717,7 @@ class RocksDBStateBackend(StateBackend): >> env.set_state_backend(EmbeddedRocksDBStateBackend()) >> env.get_checkpoint_config().set_checkpoint_storage("hdfs://checkpoints") - If you are configuring your state backend via the `flink-conf.yaml` no changes are required. + If you are configuring your state backend via the `config.yaml` no changes are required. A State Backend that stores its state in ``RocksDB``. This state backend can store very large state that exceeds memory and spills to disk. @@ -862,7 +862,7 @@ class RocksDBStateBackend(StateBackend): Sets the predefined options for RocksDB. If user-configured options within ``RocksDBConfigurableOptions`` is set (through - flink-conf.yaml) or a user-defined options factory is set (via :func:`setOptions`), + config.yaml) or a user-defined options factory is set (via :func:`setOptions`), then the options from the factory are applied on top of the here specified predefined options and customized options. @@ -882,7 +882,7 @@ class RocksDBStateBackend(StateBackend): are :data:`PredefinedOptions.DEFAULT`. If user-configured options within ``RocksDBConfigurableOptions`` is set (through - flink-conf.yaml) or a user-defined options factory is set (via :func:`setOptions`), + config.yaml) or a user-defined options factory is set (via :func:`setOptions`), then the options from the factory are applied on top of the predefined and customized options. diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py index d1b7ce41118..20fcf85d7ed 100644 --- a/flink-python/pyflink/datastream/stream_execution_environment.py +++ b/flink-python/pyflink/datastream/stream_execution_environment.py @@ -842,7 +842,7 @@ class StreamExecutionEnvironment(object): method returns a local execution environment. When executed from the command line the given configuration is stacked on top of the - global configuration which comes from the flink-conf.yaml, potentially overriding + global configuration which comes from the config.yaml, potentially overriding duplicated options. :param configuration: The configuration to instantiate the environment with. diff --git a/flink-python/pyflink/pyflink_gateway_server.py b/flink-python/pyflink/pyflink_gateway_server.py index dcb3f34884e..09b77231ef7 100644 --- a/flink-python/pyflink/pyflink_gateway_server.py +++ b/flink-python/pyflink/pyflink_gateway_server.py @@ -43,29 +43,43 @@ def on_windows(): return platform.system() == "Windows" -def read_from_config(key, default_value, flink_conf_file): - value = default_value - # get the realpath of tainted path value to avoid CWE22 problem that constructs a path or URI - # using the tainted value and might allow an attacker to access, modify, or test the existence - # of critical or sensitive files. - with open(os.path.realpath(flink_conf_file), "r") as f: - while True: - line = f.readline() - if not line: - break - if line.startswith("#") or len(line.strip()) == 0: - continue - k, v = line.split(":", 1) - if k.strip() == key: - value = v.strip() - return value +def read_from_config(key, default_value, flink_conf_directory): + import yaml + # try to find flink-conf.yaml file in flink_conf_directory + flink_conf_file = os.path.join(flink_conf_directory, "flink-conf.yaml") + if os.path.isfile(flink_conf_file): + # If flink-conf.yaml exists, use the old parsing logic to read the value + # get the realpath of tainted path value to avoid CWE22 problem that constructs a path + # or URI using the tainted value and might allow an attacker to access, modify, or test + # the existence of critical or sensitive files. + with open(os.path.realpath(flink_conf_file), "r") as f: + while True: + line = f.readline() + if not line: + break + if line.startswith("#") or len(line.strip()) == 0: + continue + k, v = line.split(":", 1) + if k.strip() == key: + return v.strip() + else: + # If flink-conf.yaml does not exist, try to find config.yaml instead + config_file = os.path.join(flink_conf_directory, "config.yaml") + if os.path.isfile(config_file): + # If config.yaml exists, use YAML parser to read the value + with open(os.path.realpath(config_file), "r") as f: + config = yaml.safe_load(f) + return config.get(key, default_value) + + # If neither file exists, return the default value + return default_value def find_java_executable(): java_executable = "java.exe" if on_windows() else "java" flink_home = _find_flink_home() - flink_conf_file = os.path.join(flink_home, "conf", "flink-conf.yaml") - java_home = read_from_config(KEY_ENV_JAVA_HOME, None, flink_conf_file) + flink_conf_directory = os.path.join(flink_home, "conf") + java_home = read_from_config(KEY_ENV_JAVA_HOME, None, flink_conf_directory) if java_home is None and "JAVA_HOME" in os.environ: java_home = os.environ["JAVA_HOME"] @@ -120,13 +134,12 @@ def construct_log_settings(env): flink_home = os.path.realpath(_find_flink_home()) flink_conf_dir = env['FLINK_CONF_DIR'] - flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml") if "FLINK_LOG_DIR" in env: flink_log_dir = env["FLINK_LOG_DIR"] else: flink_log_dir = read_from_config( - KEY_ENV_LOG_DIR, os.path.join(flink_home, "log"), flink_conf_file) + KEY_ENV_LOG_DIR, os.path.join(flink_home, "log"), env['FLINK_CONF_DIR']) if "LOG4J_PROPERTIES" in env: log4j_properties = env["LOG4J_PROPERTIES"] @@ -156,14 +169,13 @@ def construct_log_settings(env): def get_jvm_opts(env): - flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml") jvm_opts = env.get("FLINK_ENV_JAVA_OPTS") if jvm_opts is None: - default_jvm_opts = read_from_config(KEY_ENV_JAVA_DEFAULT_OPTS, "", flink_conf_file) + default_jvm_opts = read_from_config(KEY_ENV_JAVA_DEFAULT_OPTS, "", env['FLINK_CONF_DIR']) extra_jvm_opts = read_from_config( KEY_ENV_JAVA_OPTS, - read_from_config(KEY_ENV_JAVA_OPTS_DEPRECATED, "", flink_conf_file), - flink_conf_file) + read_from_config(KEY_ENV_JAVA_OPTS_DEPRECATED, "", env['FLINK_CONF_DIR']), + env['FLINK_CONF_DIR']) jvm_opts = default_jvm_opts + " " + extra_jvm_opts # Remove leading and trailing double quotes (if present) of value @@ -194,8 +206,6 @@ def construct_flink_classpath(env): def construct_hadoop_classpath(env): - flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml") - hadoop_conf_dir = "" if 'HADOOP_CONF_DIR' not in env and 'HADOOP_CLASSPATH' not in env: if os.path.isdir("/etc/hadoop/conf"): @@ -212,11 +222,11 @@ def construct_hadoop_classpath(env): return os.pathsep.join( [env.get("HADOOP_CLASSPATH", ""), env.get("YARN_CONF_DIR", - read_from_config(KEY_ENV_YARN_CONF_DIR, "", flink_conf_file)), + read_from_config(KEY_ENV_YARN_CONF_DIR, "", env['FLINK_CONF_DIR'])), env.get("HADOOP_CONF_DIR", - read_from_config(KEY_ENV_HADOOP_CONF_DIR, hadoop_conf_dir, flink_conf_file)), + read_from_config(KEY_ENV_HADOOP_CONF_DIR, hadoop_conf_dir, env['FLINK_CONF_DIR'])), env.get("HBASE_CONF_DIR", - read_from_config(KEY_ENV_HBASE_CONF_DIR, hbase_conf_dir, flink_conf_file))]) + read_from_config(KEY_ENV_HBASE_CONF_DIR, hbase_conf_dir, env['FLINK_CONF_DIR']))]) def construct_test_classpath(env): diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py index bd12a447823..ba17767c776 100644 --- a/flink-python/pyflink/table/table_config.py +++ b/flink-python/pyflink/table/table_config.py @@ -36,7 +36,7 @@ class TableConfig(object): This class is a pure API class that abstracts configuration from various sources. Currently, configuration can be set in any of the following layers (in the given order): - - flink-conf.yaml + - config.yaml - CLI parameters - :class:`~pyflink.datastream.StreamExecutionEnvironment` when bridging to DataStream API - :func:`~EnvironmentSettings.Builder.with_configuration` @@ -106,8 +106,8 @@ class TableConfig(object): jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key() classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key() if key in [jars_key, classpaths_key]: - add_jars_to_context_class_loader(value.split(";")) - + jar_urls = Configuration.parse_jars_value(value, jvm) + add_jars_to_context_class_loader(jar_urls) return self def get_local_timezone(self) -> str: diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index 3327261ac41..ebfcc7cb095 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -1540,23 +1540,33 @@ class TableEnvironment(object): j_config.setString(jvm.PythonOptions.PYTHON_EXECUTABLE.key(), sys.executable) def _add_jars_to_j_env_config(self, config_key): - jvm = get_gateway().jvm jar_urls = self.get_config().get(config_key, None) - if jar_urls is not None: - # normalize + + if jar_urls: + jvm = get_gateway().jvm jar_urls_list = [] - for url in jar_urls.split(";"): - url = url.strip() - if url != "": - jar_urls_list.append(jvm.java.net.URL(url).toString()) + parsed_jar_urls = Configuration.parse_jars_value(jar_urls, jvm) + url_strings = [ + jvm.java.net.URL(url).toString() if url else "" + for url in parsed_jar_urls + ] + self._parse_urls(url_strings, jar_urls_list) + j_configuration = get_j_env_configuration(self._get_j_env()) - if j_configuration.containsKey(config_key): - for url in j_configuration.getString(config_key, "").split(";"): - url = url.strip() - if url != "" and url not in jar_urls_list: - jar_urls_list.append(url) + parsed_jar_urls = Configuration.parse_jars_value( + j_configuration.getString(config_key, ""), + jvm + ) + self._parse_urls(parsed_jar_urls, jar_urls_list) + j_configuration.setString(config_key, ";".join(jar_urls_list)) + def _parse_urls(self, jar_urls, jar_urls_list): + for url in jar_urls: + url = url.strip() + if url != "" and url not in jar_urls_list: + jar_urls_list.append(url) + def _get_j_env(self): return self._j_tenv.getPlanner().getExecEnv() diff --git a/flink-python/setup.py b/flink-python/setup.py index 7161354be9f..24637fe4c7d 100644 --- a/flink-python/setup.py +++ b/flink-python/setup.py @@ -325,7 +325,9 @@ try: 'pandas>=1.3.0', 'pyarrow>=5.0.0', 'pemja==0.4.1;platform_system != "Windows"', - 'httplib2>=0.19.0', apache_flink_libraries_dependency] + 'httplib2>=0.19.0', + 'pyyaml>=6.0.1', + apache_flink_libraries_dependency] setup( name='apache-flink', diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java b/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java index d7959c425e9..c58267e6fc9 100644 --- a/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java +++ b/flink-python/src/main/java/org/apache/flink/python/PythonConfig.java @@ -48,7 +48,7 @@ public class PythonConfig implements ReadableConfig { } /** - * Configuration adopted from the outer layer, e.g. flink-conf.yaml, command line arguments, + * Configuration adopted from the outer layer, e.g. config.yaml, command line arguments, * TableConfig, etc. */ private final ReadableConfig configuration; diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java index 17f425cebb8..142457bd19c 100644 --- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java +++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java @@ -182,7 +182,7 @@ public class PythonOptions { .text("2. the command line option \"-pyclientexec\";") .linebreak() .text( - "3. the configuration 'python.client.executable' defined in flink-conf.yaml") + "3. the configuration 'python.client.executable' defined in config.yaml") .linebreak() .text("4. the environment variable PYFLINK_CLIENT_EXECUTABLE;") .build()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java index d0185c5f0ee..8ea11a65ba3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartBackoffTimeStrategyFactoryLoader.java @@ -46,8 +46,8 @@ public final class RestartBackoffTimeStrategyFactoryLoader { * <li>Strategy set within job graph, i.e. {@link * RestartStrategies.RestartStrategyConfiguration}, unless the config is {@link * RestartStrategies.FallbackRestartStrategyConfiguration}. - * <li>Strategy set in the cluster(server-side) config (flink-conf.yaml), unless the strategy - * is not specified + * <li>Strategy set in the cluster(server-side) config (config.yaml), unless the strategy is + * not specified * <li>{@link * FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory} if * checkpointing is enabled. Otherwise {@link diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java index ea4f007449b..df4809714ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java @@ -79,7 +79,7 @@ public class CheckpointStorageLoader { if (logger != null) { logger.debug( "The configuration {} has not be set in the current" - + " sessions flink-conf.yaml. Falling back to a default CheckpointStorage" + + " sessions config.yaml. Falling back to a default CheckpointStorage" + " type. Users are strongly encouraged explicitly set this configuration" + " so they understand how their applications are checkpointing" + " snapshots for fault-tolerance.", @@ -138,7 +138,7 @@ public class CheckpointStorageLoader { * StreamExecutionEnvironment}. * * <p>3) Use the {@link CheckpointStorage} instance configured via the clusters - * <b>flink-conf.yaml</b>. + * <b>config.yaml</b>. * * <p>4) Load a default {@link CheckpointStorage} instance. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index aded04d7b6b..ec219432faa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -67,7 +67,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints"); * }</pre> * - * <p>If you are configuring your state backend via the {@code flink-conf.yaml} please make the + * <p>If you are configuring your state backend via the {@code config.yaml} please make the * following changes set your state backend type to "hashmap" {@code state.backend.type: hashmap}. * * <p>This state backend holds the working state in the memory (JVM heap) of the TaskManagers. The diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index 87d8d77803e..6345c2b4602 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -59,7 +59,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; * env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage()); * }</pre> * - * <p>If you are configuring your state backend via the {@code flink-conf.yaml} please make the + * <p>If you are configuring your state backend via the {@code config.yaml} please make the * following changes: * * <pre>{@code diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java index 048515b5b44..6d15c53c1a2 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java @@ -698,7 +698,7 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke * Sets the predefined options for RocksDB. * * <p>If user-configured options within {@link RocksDBConfigurableOptions} is set (through - * flink-conf.yaml) or a user-defined options factory is set (via {@link + * config.yaml) or a user-defined options factory is set (via {@link * #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the factory are applied on * top of the here specified predefined options and customized options. * @@ -714,7 +714,7 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke * PredefinedOptions#DEFAULT}. * * <p>If user-configured options within {@link RocksDBConfigurableOptions} is set (through - * flink-conf.yaml) of a user-defined options factory is set (via {@link + * config.yaml) of a user-defined options factory is set (via {@link * #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the factory are applied on * top of the predefined and customized options. * diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java index 69576520fc3..e78d9fb0eb5 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java @@ -145,7 +145,7 @@ public class RocksDBOptions { + "This option only takes effect if neither '%s' nor '%s' are not configured. If none is configured " + "then each RocksDB column family state has its own memory caches (as controlled by the column " + "family options). " - + "The relevant options for the shared resources (e.g. write-buffer-ratio) can be set on the same level (flink-conf.yaml)." + + "The relevant options for the shared resources (e.g. write-buffer-ratio) can be set on the same level (config.yaml)." + "Note, that this feature breaks resource isolation between the slots", USE_MANAGED_MEMORY.key(), FIX_PER_SLOT_MEMORY_SIZE.key())); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 0c37a1e6bc5..dd3fcb70cf4 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -61,8 +61,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * env.getCheckpointConfig().setCheckpointStorage("hdfs://checkpoints"); * }</pre> * - * <p>If you are configuring your state backend via the {@code flink-conf.yaml} no changes are - * required. + * <p>If you are configuring your state backend via the {@code config.yaml} no changes are required. * * <p>A State Backend that stores its state in {@code RocksDB}. This state backend can store very * large state that exceeds memory and spills to disk. @@ -399,7 +398,7 @@ public class RocksDBStateBackend extends AbstractManagedMemoryStateBackend * Sets the predefined options for RocksDB. * * <p>If user-configured options within {@link RocksDBConfigurableOptions} is set (through - * flink-conf.yaml) or a user-defined options factory is set (via {@link + * config.yaml) or a user-defined options factory is set (via {@link * #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the factory are applied on * top of the here specified predefined options and customized options. * @@ -415,7 +414,7 @@ public class RocksDBStateBackend extends AbstractManagedMemoryStateBackend * PredefinedOptions#DEFAULT}. * * <p>If user-configured options within {@link RocksDBConfigurableOptions} is set (through - * flink-conf.yaml) of a user-defined options factory is set (via {@link + * config.yaml) of a user-defined options factory is set (via {@link * #setRocksDBOptions(RocksDBOptionsFactory)}), then the options from the factory are applied on * top of the predefined and customized options. * diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 82a42dfdb3c..34d27e1bb59 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -249,7 +249,7 @@ public class RocksDBStateBackendConfigTest { env.close(); } - /** Validates that user custom configuration from code should override the flink-conf.yaml. */ + /** Validates that user custom configuration from code should override the config.yaml. */ @Test public void testConfigureTimerServiceLoadingFromApplication() throws Exception { final MockEnvironment env = new MockEnvironmentBuilder().build(); @@ -524,7 +524,7 @@ public class RocksDBStateBackendConfigTest { // verify that we would use PredefinedOptions.DEFAULT by default. assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions()); - // verify that user could configure predefined options via flink-conf.yaml + // verify that user could configure predefined options via config.yaml Configuration configuration = new Configuration(); configuration.set( RocksDBOptions.PREDEFINED_OPTIONS, PredefinedOptions.FLASH_SSD_OPTIMIZED.name()); @@ -640,7 +640,7 @@ public class RocksDBStateBackendConfigTest { String checkpointPath = tempFolder.newFolder().toURI().toString(); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); - // verify that user-defined options factory could be configured via flink-conf.yaml + // verify that user-defined options factory could be configured via config.yaml Configuration config = new Configuration(); config.setString(RocksDBOptions.OPTIONS_FACTORY.key(), TestOptionsFactory.class.getName()); config.setString(TestOptionsFactory.BACKGROUND_JOBS_OPTION.key(), "4"); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index f14f3d4400a..b179ead220d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -2591,7 +2591,7 @@ public class StreamExecutionEnvironment implements AutoCloseable { * execution environment, as returned by {@link #createLocalEnvironment(Configuration)}. * * <p>When executed from the command line the given configuration is stacked on top of the - * global configuration which comes from the {@code flink-conf.yaml}, potentially overriding + * global configuration which comes from the {@code config.yaml}, potentially overriding * duplicated options. * * @param configuration The configuration to instantiate the environment with. diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java index 551e05ef962..eae416cd0a6 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java @@ -127,8 +127,8 @@ public class DefaultContext { // ------------------------------------------------------------------------------------------- /** - * Build the {@link DefaultContext} from flink-conf.yaml, dynamic configuration and users - * specified jars. + * Build the {@link DefaultContext} from config.yaml, dynamic configuration and users specified + * jars. * * @param dynamicConfig user specified configuration. * @param dependencies user specified jars diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java index 0c3e3ede7a6..65daeb4a3a8 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java @@ -43,7 +43,7 @@ public class SqlGatewayRestEndpointTestUtils { return rebuildRestEndpointOptions(context.getEndpointOptions()); } - /** Create the configuration generated from flink-conf.yaml. */ + /** Create the configuration generated from config.yaml. */ public static Configuration getFlinkConfig( String address, String bindAddress, String portRange) { final Configuration config = new Configuration(); diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java index e5d22ed943e..8ca455d31b4 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java @@ -184,7 +184,7 @@ class SessionContextTest { CatalogListener1.class.getName(), CatalogListener2.class.getName())); - // Find and create listeners from flink-conf.yaml for session + // Find and create listeners from config.yaml for session flinkConfig.set( TableConfigOptions.TABLE_CATALOG_MODIFICATION_LISTENERS, Arrays.asList(CatalogFactory1.IDENTIFIER, CatalogFactory2.IDENTIFIER)); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java index f80ad6393ec..3d3ffe29cca 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java @@ -50,7 +50,7 @@ import static org.apache.flink.table.api.internal.TableConfigValidation.validate * configuration can be set in any of the following layers (in the given order): * * <ol> - * <li>{@code flink-conf.yaml}, + * <li>{@code config.yaml}, * <li>CLI parameters, * <li>{@code StreamExecutionEnvironment} when bridging to DataStream API, * <li>{@link EnvironmentSettings.Builder#withConfiguration(Configuration)} / {@link @@ -103,7 +103,7 @@ public final class TableConfig implements WritableConfig, ReadableConfig { // Note to implementers: // TableConfig is a ReadableConfig which is built once the TableEnvironment is created and - // contains both the configuration defined in the execution context (flink-conf.yaml + CLI + // contains both the configuration defined in the execution context (config.yaml + CLI // params), stored in rootConfiguration, but also any extra configuration defined by the user in // the application, which has precedence over the execution configuration. // diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainersSettings.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainersSettings.java index cd2bee6b39c..f3c79c4b35d 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainersSettings.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainersSettings.java @@ -283,8 +283,8 @@ public class FlinkContainersSettings { } /** - * Sets a single Flink configuration parameter (the options for flink-conf.yaml) and returns - * a reference to this Builder enabling method chaining. + * Sets a single Flink configuration parameter (the options for config.yaml) and returns a + * reference to this Builder enabling method chaining. * * @param <T> The type parameter. * @param option The option. diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java index 38b542e1310..1ee0c9f6999 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkImageBuilder.java @@ -125,7 +125,7 @@ public class FlinkImageBuilder { } /** - * Sets Flink configuration. This configuration will be used for generating flink-conf.yaml for + * Sets Flink configuration. This configuration will be used for generating config.yaml for * configuring JobManager and TaskManager. */ public FlinkImageBuilder setConfiguration(Configuration conf) { @@ -209,12 +209,10 @@ public class FlinkImageBuilder { final Path flinkConfFile = createTemporaryFlinkConfFile(conf, tempDirectory); final Path log4jPropertiesFile = createTemporaryLog4jPropertiesFile(tempDirectory); - // Copy flink-conf.yaml into image - // NOTE: Before we change the default conf file in the flink-dist to 'config.yaml', we - // need to use the legacy flink conf file 'flink-conf.yaml' here. + // Copy config.yaml into image filesToCopy.put( flinkConfFile, - Paths.get(flinkHome, "conf", GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME)); + Paths.get(flinkHome, "conf", GlobalConfiguration.FLINK_CONF_FILENAME)); filesToCopy.put( log4jPropertiesFile, Paths.get(flinkHome, "conf", LOG4J_PROPERTIES_FILENAME)); @@ -292,13 +290,10 @@ public class FlinkImageBuilder { private Path createTemporaryFlinkConfFile(Configuration finalConfiguration, Path tempDirectory) throws IOException { - // Create a temporary flink-conf.yaml file and write merged configurations into it - // NOTE: Before we change the default conf file in the flink-dist to 'config.yaml', we - // need to use the legacy flink conf file 'flink-conf.yaml' here. - Path flinkConfFile = tempDirectory.resolve(GlobalConfiguration.LEGACY_FLINK_CONF_FILENAME); + Path flinkConfFile = tempDirectory.resolve(GlobalConfiguration.FLINK_CONF_FILENAME); Files.write( flinkConfFile, - ConfigurationUtils.convertConfigToWritableLines(finalConfiguration, true)); + ConfigurationUtils.convertConfigToWritableLines(finalConfiguration, false)); return flinkConfFile; } diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index fcf32ea13be..ec7c6905ee7 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -821,10 +821,9 @@ public abstract class YarnTestBase { File flinkConfDirPath = TestUtils.findFile( - // NOTE: Before we change the default conf file in the flink-dist to - // 'config.yaml', we need to use the legacy flink conf file - // 'flink-conf.yaml' here. - flinkDistRootDir, new ContainsName(new String[] {"flink-conf.yaml"})); + flinkDistRootDir, + new ContainsName( + new String[] {GlobalConfiguration.FLINK_CONF_FILENAME})); assertThat(flinkConfDirPath).isNotNull(); final String confDirPath = flinkConfDirPath.getParentFile().getAbsolutePath(); @@ -839,7 +838,8 @@ public abstract class YarnTestBase { FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun); BootstrapTools.writeConfiguration( - globalConfiguration, new File(tempConfPathForSecureRun, "flink-conf.yaml")); + globalConfiguration, + new File(tempConfPathForSecureRun, GlobalConfiguration.FLINK_CONF_FILENAME)); String configDir = tempConfPathForSecureRun.getAbsolutePath(); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 4f81077b16e..5d9c77695e0 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -485,7 +485,7 @@ public final class Utils { * * @param flinkConfig The Flink configuration. * @param tmParams Parameters for the task manager. - * @param configDirectory The configuration directory for the flink-conf.yaml + * @param configDirectory The configuration directory for the config.yaml * @param logDirectory The log directory. * @param hasLogback Uses logback? * @param hasLog4j Uses log4j?