Gerk Elznik created FLINK-40055:
-----------------------------------
Summary: Operator serializes managed-deployment config in its own
YAML dialect, breaking Flink 1.x deployments when the operator itself runs on
standard config.yaml
Key: FLINK-40055
URL: https://issues.apache.org/jira/browse/FLINK-40055
Project: Flink
Issue Type: Bug
Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.15.0
Reporter: Gerk Elznik
When the operator's own configuration is loaded from a standard-mode
{{config.yaml}} (rather than legacy {{{}flink-conf.yaml{}}}), the config files
it generates for *managed Flink 1.x deployments* contain standard-YAML-typed
values that Flink 1.x's legacy parser cannot read. An application-mode
deployment then crashloops at JobManager startup:
{code:java}
ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not create
application program.
java.lang.RuntimeException: java.net.URISyntaxException: Illegal character in
scheme name at index 0: ['local:///opt/flink/usrlib/myjob.jar']
at
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.fetchArtifacts(...)
[flink-dist-1.20.4.jar]
{code}
h3. Reproduction (A/B, only variable = the operator's own config file format)
Same operator image (current main), same kind cluster, same {{v1_20}}
application-mode FlinkDeployment ({{{}local://{}}} StateMachineExample):
|| ||operator reads {{config.yaml}}||operator reads {{flink-conf.yaml}}
(today's chart default)||
|operator config mode|standard YAML|legacy|
|value written into the deployment's {{flink-conf.yaml}}|{{pipeline.jars:
['local:///…/myjob.jar']}}|{{pipeline.jars: local:///…/myjob.jar}}|
|{{v1_20}} JobManager|URISyntaxException → CrashLoopBackOff|READY / RUNNING|
Today this is reachable e.g. via a conf-override dir or a custom mount that
places a {{config.yaml}} in the operator's conf directory
({{{}GlobalConfiguration.loadConfiguration{}}} prefers it). FLINK-39791 (PR
#1126) would make it the default path by mounting the operator config as
{{{}config.yaml{}}}, which is how it was found — see the analysis on
[https://github.com/apache/flink-kubernetes-operator/pull/1126].
h3. Root cause
The YAML dialect used for serialization is a *process-global* property in
flink-core: when the operator loads its own configuration from
{{{}config.yaml{}}}, {{GlobalConfiguration.isStandardYaml()}} becomes true
JVM-wide, and every no-arg {{new Configuration()}} — including
{{{}Configuration#clone(){}}}, which is implemented as {{new Configuration() +
addAll}} — creates standard-dialect configs from then on. The managed
deployment's effective config therefore speaks the *operator's* dialect instead
of the {*}target deployment's{*}:
# The deploy config handed to the reconciler
({{{}FlinkConfigManager#getConfig{}}} → {{{}cache.get(key).clone(){}}}) carries
{{standardYaml=true}} (via the process-global flag; the cached original also
inherits it through {{new Configuration(defaultConfig)}} in
{{{}getDefaultConfig{}}}).
# {{AbstractFlinkService#removeOperatorConfigs}} round-trips that config
through {{config.toMap()}} + {{{}setString{}}}. {{Configuration#toMap}}
serializes every value with {{{}ConfigurationUtils.convertToString(value,
standardYaml){}}}; with {{standardYaml=true}} a {{List}} (e.g.
{{{}PipelineOptions.JARS{}}}, set as a typed list by
{{{}FlinkConfigBuilder{}}}) becomes the string {{{}"['local:///…']"{}}}.
# {{FlinkConfMountDecorator#getClusterSideConfData}} then correctly builds
{{new Configuration(useStandardYamlConfig())}} — false for v1_x, so the file is
named {{flink-conf.yaml}} and written legacy-style — but the damage is done:
the bracketed *string* is written verbatim. The comment there ("would always be
false currently", from FLINK-37236) documents exactly the assumption that no
longer holds once the operator itself runs on standard config.
# Flink 1.x's legacy parser reads {{['local://…']}} as a raw string →
{{{}URISyntaxException{}}}.
The chain was reproduced in isolation against flink-core 1.20.4: after
{{GlobalConfiguration.loadConfiguration}} of a directory containing
{{{}config.yaml{}}}, the clone→{{{}toMap{}}}→legacy-write sequence emits
{{{}pipeline.jars: ['local:///…']{}}}, while a dialect-aware copy emits the
legacy-parseable {{{}pipeline.jars: local:///…{}}}.
Affects any list-typed option (not just {{{}pipeline.jars{}}}) for any
{{v1_xx}} deployment. Session-job submission has the same dialect leak:
{{AbstractFlinkService#submitJobToSessionCluster}} sends {{conf.toMap()}}
(operator-dialect strings) in the {{JarRunRequestBody}} to v1_17+ session
clusters.
h3. Proposed fix
Fix at the {*}serialization boundaries{*}, leaving in-process parse semantics
untouched (stamping the target dialect onto the long-lived resource configs was
prototyped and rejected: {{standardYaml}} also governs how raw string values
are {*}parsed{*}, so it changes read behavior for every consumer — and any
downstream {{{}new Configuration(){}}}/{{{}clone(){}}} silently resets it
anyway):
* {{AbstractFlinkService#removeOperatorConfigs}} — copy via the copy
constructor and {{removeKey}} directly instead of the
{{{}toMap(){}}}/{{{}setString{}}} round-trip, so typed values (e.g. the
{{pipeline.jars}} list) survive untouched to the boundaries that already render
per target version ({{{}FlinkConfMountDecorator{}}});
* {{AbstractFlinkService#runJar}} — serialize the session-job REST config map
in the receiving cluster's dialect (a throwaway dialect-stamped copy just for
the {{{}JarRunRequestBody{}}});
* the Flink-2.0 dialect threshold becomes one shared authority:
{{FlinkConfMountDecorator#useStandardYamlConfig(FlinkVersion)}} (public static;
the instance method delegates).
Verified with a from-source operator build across all four operator-format ×
deployment-version combinations: the {{v1_20}} application deployment that
crashloops without the fix reaches RUNNING with it (operator on
{{{}config.yaml{}}}), Flink 2.x deployments work under both operator formats,
and the generated deployment config is identical regardless of the operator's
own format (only per-run identity values differ). Also exercised beyond CI with
an in-place operator config-format migration (helm chart flip legacy↔standard
under running v1 and v2 jobs, JobManager pods killed after each flip to force a
config re-read): no job disruption, no pod churn, generated configs
byte-identical. Unit test included ({{{}AbstractFlinkServiceTest{}}}) that pins
the process-global standard-yaml flag and asserts typed-value survival plus
per-version serialization.
This is a prerequisite for FLINK-39791 (chart mounting the operator config as
{{{}config.yaml{}}}).
#
## Issue links to add after creation
- *{*}blocks{*}* FLINK-39791
- *{*}relates to{*}* FLINK-37236
--
This message was sent by Atlassian Jira
(v8.20.10#820010)