Jiyong Wang created FLINK-40005:
-----------------------------------

             Summary: [cdc-cli] CliExecutor in application mode parses the 
pipeline definition file path as YAML content -> "Missing required field 
'source'"
                 Key: FLINK-40005
                 URL: https://issues.apache.org/jira/browse/FLINK-40005
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.6.0
            Reporter: Jiyong Wang


h2. Flink CDC version

* Flink CDC 3.6.0 ({{flink-cdc-dist-3.6.0-1.20.jar}})
* Flink 1.20.x
* Deployment: Application mode via Flink Kubernetes Operator (FlinkDeployment), 
checkpoint/HA on S3

h2. What happened

Submitting a YAML pipeline in *application mode* fails to start. The 
FlinkDeployment job is configured as:

{code:yaml}
job:
  jarURI: local:///opt/flink/lib/flink-cdc-dist-3.6.0-1.20.jar
  entryClass: org.apache.flink.cdc.cli.CliExecutor
  args:
    - /flink-cdc-resolved/pipeline.yaml   # local path to the pipeline 
definition file
{code}

JobManager fails during job graph construction with:

{code}
Missing required field "source" in top-level configuration.
{code}

The job never runs. The same {{pipeline.yaml}} submitted via session mode 
({{flink-cdc.sh}} / {{CliFrontend}}) works fine.

h2. Root cause

{{CliExecutor.main(String[] args)}} (the application-mode entry point) does:

{code:java}
PipelineDef pipelineDef = pipelineDefinitionParser.parse(args[0], new 
Configuration());
{code}

{{args[0]}} is the pipeline definition *file path* (the deployment executors 
set {{ApplicationConfiguration.APPLICATION_ARGS = commandLine.getArgList()}}, 
i.e. the file path shipped into the JobManager container). But 
{{YamlPipelineDefinitionParser.parse(String, Configuration)}} treats its 
{{String}} argument as the YAML *content*, not a path. So the path string 
itself ({{/flink-cdc-resolved/pipeline.yaml}}) is parsed as YAML, producing a 
document with no {{source}}/{{sink}} keys, which then fails validation with 
_Missing required field "source"_.

Session mode works because {{CliFrontend}} reads the file into content before 
parsing.

h2. Expected behavior

In application mode {{CliExecutor}} should read the pipeline definition file 
referenced by {{args[0]}} and parse its *content*.

h2. Suggested fix

Read the file content with the *local JVM file API* (not Flink's 
{{FileSystem}}) and pass the content to the {{String}} (content) overload. 
Using Flink {{FileSystem.get(localPath)}} is unreliable here because the 
cluster default FileSystem is often S3 (checkpoints/HA on S3), so it would not 
resolve the local file shipped next to the JM.

{code:java}
// flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java  (main)
String pipelineDefContent =
        new String(Files.readAllBytes(Paths.get(args[0])), 
StandardCharsets.UTF_8);
PipelineDef pipelineDef =
        pipelineDefinitionParser.parse(pipelineDefContent, new Configuration());
{code}

h2. How to reproduce

# Build a Flink 1.20 image with {{flink-cdc-dist-3.6.0-1.20.jar}} in 
{{/opt/flink/lib}}.
# Deploy a FlinkDeployment (application mode) with 
{{entryClass=org.apache.flink.cdc.cli.CliExecutor}} and 
{{args=[/path/to/pipeline.yaml]}}, the pipeline file mounted/shipped into the 
JM container.
# JM fails with {{Missing required field "source" in top-level configuration}}.

h2. Affected component

{{flink-cdc-cli}} —— {{org.apache.flink.cdc.cli.CliExecutor#main}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to