[ 
https://issues.apache.org/jira/browse/FLINK-40005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-40005:
-----------------------------------
    Labels: pull-request-available  (was: )

> [cdc-cli] CliExecutor in application mode parses the pipeline definition file 
> path as YAML content -> "Missing required field 'source'"
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-40005
>                 URL: https://issues.apache.org/jira/browse/FLINK-40005
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.6.0
>            Reporter: Jiyong Wang
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Flink CDC version
> * Flink CDC 3.6.0 ({{flink-cdc-dist-3.6.0-1.20.jar}})
> * Flink 1.20.x
> * Deployment: Application mode via Flink Kubernetes Operator 
> (FlinkDeployment), checkpoint/HA on S3
> h2. What happened
> Submitting a YAML pipeline in *application mode* fails to start. The 
> FlinkDeployment job is configured as:
> {code:yaml}
> job:
>   jarURI: local:///opt/flink/lib/flink-cdc-dist-3.6.0-1.20.jar
>   entryClass: org.apache.flink.cdc.cli.CliExecutor
>   args:
>     - /flink-cdc-resolved/pipeline.yaml   # local path to the pipeline 
> definition file
> {code}
> JobManager fails during job graph construction with:
> {code}
> Missing required field "source" in top-level configuration.
> {code}
> The job never runs. The same {{pipeline.yaml}} submitted via session mode 
> ({{flink-cdc.sh}} / {{CliFrontend}}) works fine.
> h2. Root cause
> {{CliExecutor.main(String[] args)}} (the application-mode entry point) does:
> {code:java}
> PipelineDef pipelineDef = pipelineDefinitionParser.parse(args[0], new 
> Configuration());
> {code}
> {{args[0]}} is the pipeline definition *file path* (the deployment executors 
> set {{ApplicationConfiguration.APPLICATION_ARGS = commandLine.getArgList()}}, 
> i.e. the file path shipped into the JobManager container). But 
> {{YamlPipelineDefinitionParser.parse(String, Configuration)}} treats its 
> {{String}} argument as the YAML *content*, not a path. So the path string 
> itself ({{/flink-cdc-resolved/pipeline.yaml}}) is parsed as YAML, producing a 
> document with no {{source}}/{{sink}} keys, which then fails validation with 
> _Missing required field "source"_.
> Session mode works because {{CliFrontend}} reads the file into content before 
> parsing.
> h2. Expected behavior
> In application mode {{CliExecutor}} should read the pipeline definition file 
> referenced by {{args[0]}} and parse its *content*.
> h2. Suggested fix
> Read the file content with the *local JVM file API* (not Flink's 
> {{FileSystem}}) and pass the content to the {{String}} (content) overload. 
> Using Flink {{FileSystem.get(localPath)}} is unreliable here because the 
> cluster default FileSystem is often S3 (checkpoints/HA on S3), so it would 
> not resolve the local file shipped next to the JM.
> {code:java}
> // flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java  
> (main)
> String pipelineDefContent =
>         new String(Files.readAllBytes(Paths.get(args[0])), 
> StandardCharsets.UTF_8);
> PipelineDef pipelineDef =
>         pipelineDefinitionParser.parse(pipelineDefContent, new 
> Configuration());
> {code}
> h2. How to reproduce
> # Build a Flink 1.20 image with {{flink-cdc-dist-3.6.0-1.20.jar}} in 
> {{/opt/flink/lib}}.
> # Deploy a FlinkDeployment (application mode) with 
> {{entryClass=org.apache.flink.cdc.cli.CliExecutor}} and 
> {{args=[/path/to/pipeline.yaml]}}, the pipeline file mounted/shipped into the 
> JM container.
> # JM fails with {{Missing required field "source" in top-level 
> configuration}}.
> h2. Affected component
> {{flink-cdc-cli}} —— {{org.apache.flink.cdc.cli.CliExecutor#main}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to