Gerk Elznik created FLINK-40055:
-----------------------------------

             Summary: Operator serializes managed-deployment config in its own 
YAML dialect, breaking Flink 1.x deployments when the operator itself runs on 
standard config.yaml
                 Key: FLINK-40055
                 URL: https://issues.apache.org/jira/browse/FLINK-40055
             Project: Flink
          Issue Type: Bug
          Components: Kubernetes Operator
    Affects Versions: kubernetes-operator-1.15.0
            Reporter: Gerk Elznik


When the operator's own configuration is loaded from a standard-mode 
{{config.yaml}} (rather than legacy {{{}flink-conf.yaml{}}}), the config files 
it generates for *managed Flink 1.x deployments* contain standard-YAML-typed 
values that Flink 1.x's legacy parser cannot read. An application-mode 
deployment then crashloops at JobManager startup:
{code:java}
ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not create 
application program.
java.lang.RuntimeException: java.net.URISyntaxException: Illegal character in 
scheme name at index 0: ['local:///opt/flink/usrlib/myjob.jar']
  at 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.fetchArtifacts(...)
 [flink-dist-1.20.4.jar]
{code}
h3. Reproduction (A/B, only variable = the operator's own config file format)

Same operator image (current main), same kind cluster, same {{v1_20}} 
application-mode FlinkDeployment ({{{}local://{}}} StateMachineExample):
|| ||operator reads {{config.yaml}}||operator reads {{flink-conf.yaml}} 
(today's chart default)||
|operator config mode|standard YAML|legacy|
|value written into the deployment's {{flink-conf.yaml}}|{{pipeline.jars: 
['local:///…/myjob.jar']}}|{{pipeline.jars: local:///…/myjob.jar}}|
|{{v1_20}} JobManager|URISyntaxException → CrashLoopBackOff|READY / RUNNING|

Today this is reachable e.g. via a conf-override dir or a custom mount that 
places a {{config.yaml}} in the operator's conf directory 
({{{}GlobalConfiguration.loadConfiguration{}}} prefers it). FLINK-39791 (PR 
#1126) would make it the default path by mounting the operator config as 
{{{}config.yaml{}}}, which is how it was found — see the analysis on 
[https://github.com/apache/flink-kubernetes-operator/pull/1126].
h3. Root cause

The YAML dialect used for serialization is a *process-global* property in 
flink-core: when the operator loads its own configuration from 
{{{}config.yaml{}}}, {{GlobalConfiguration.isStandardYaml()}} becomes true 
JVM-wide, and every no-arg {{new Configuration()}} — including 
{{{}Configuration#clone(){}}}, which is implemented as {{new Configuration() + 
addAll}} — creates standard-dialect configs from then on. The managed 
deployment's effective config therefore speaks the *operator's* dialect instead 
of the {*}target deployment's{*}:
 # The deploy config handed to the reconciler 
({{{}FlinkConfigManager#getConfig{}}} → {{{}cache.get(key).clone(){}}}) carries 
{{standardYaml=true}} (via the process-global flag; the cached original also 
inherits it through {{new Configuration(defaultConfig)}} in 
{{{}getDefaultConfig{}}}).
 # {{AbstractFlinkService#removeOperatorConfigs}} round-trips that config 
through {{config.toMap()}} + {{{}setString{}}}. {{Configuration#toMap}} 
serializes every value with {{{}ConfigurationUtils.convertToString(value, 
standardYaml){}}}; with {{standardYaml=true}} a {{List}} (e.g. 
{{{}PipelineOptions.JARS{}}}, set as a typed list by 
{{{}FlinkConfigBuilder{}}}) becomes the string {{{}"['local:///…']"{}}}.
 # {{FlinkConfMountDecorator#getClusterSideConfData}} then correctly builds 
{{new Configuration(useStandardYamlConfig())}} — false for v1_x, so the file is 
named {{flink-conf.yaml}} and written legacy-style — but the damage is done: 
the bracketed *string* is written verbatim. The comment there ("would always be 
false currently", from FLINK-37236) documents exactly the assumption that no 
longer holds once the operator itself runs on standard config.
 # Flink 1.x's legacy parser reads {{['local://…']}} as a raw string → 
{{{}URISyntaxException{}}}.

The chain was reproduced in isolation against flink-core 1.20.4: after 
{{GlobalConfiguration.loadConfiguration}} of a directory containing 
{{{}config.yaml{}}}, the clone→{{{}toMap{}}}→legacy-write sequence emits 
{{{}pipeline.jars: ['local:///…']{}}}, while a dialect-aware copy emits the 
legacy-parseable {{{}pipeline.jars: local:///…{}}}.

Affects any list-typed option (not just {{{}pipeline.jars{}}}) for any 
{{v1_xx}} deployment. Session-job submission has the same dialect leak: 
{{AbstractFlinkService#submitJobToSessionCluster}} sends {{conf.toMap()}} 
(operator-dialect strings) in the {{JarRunRequestBody}} to v1_17+ session 
clusters.
h3. Proposed fix

Fix at the {*}serialization boundaries{*}, leaving in-process parse semantics 
untouched (stamping the target dialect onto the long-lived resource configs was 
prototyped and rejected: {{standardYaml}} also governs how raw string values 
are {*}parsed{*}, so it changes read behavior for every consumer — and any 
downstream {{{}new Configuration(){}}}/{{{}clone(){}}} silently resets it 
anyway):
 * {{AbstractFlinkService#removeOperatorConfigs}} — copy via the copy 
constructor and {{removeKey}} directly instead of the 
{{{}toMap(){}}}/{{{}setString{}}} round-trip, so typed values (e.g. the 
{{pipeline.jars}} list) survive untouched to the boundaries that already render 
per target version ({{{}FlinkConfMountDecorator{}}});
 * {{AbstractFlinkService#runJar}} — serialize the session-job REST config map 
in the receiving cluster's dialect (a throwaway dialect-stamped copy just for 
the {{{}JarRunRequestBody{}}});
 * the Flink-2.0 dialect threshold becomes one shared authority: 
{{FlinkConfMountDecorator#useStandardYamlConfig(FlinkVersion)}} (public static; 
the instance method delegates).

Verified with a from-source operator build across all four operator-format × 
deployment-version combinations: the {{v1_20}} application deployment that 
crashloops without the fix reaches RUNNING with it (operator on 
{{{}config.yaml{}}}), Flink 2.x deployments work under both operator formats, 
and the generated deployment config is identical regardless of the operator's 
own format (only per-run identity values differ). Also exercised beyond CI with 
an in-place operator config-format migration (helm chart flip legacy↔standard 
under running v1 and v2 jobs, JobManager pods killed after each flip to force a 
config re-read): no job disruption, no pod churn, generated configs 
byte-identical. Unit test included ({{{}AbstractFlinkServiceTest{}}}) that pins 
the process-global standard-yaml flag and asserts typed-value survival plus 
per-version serialization.

This is a prerequisite for FLINK-39791 (chart mounting the operator config as 
{{{}config.yaml{}}}).
 # 
 ## Issue links to add after creation

 - *{*}blocks{*}* FLINK-39791
 - *{*}relates to{*}* FLINK-37236



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to