[
https://issues.apache.org/jira/browse/FLINK-40005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-40005:
-----------------------------------
Labels: pull-request-available (was: )
> [cdc-cli] CliExecutor in application mode parses the pipeline definition file
> path as YAML content -> "Missing required field 'source'"
> ---------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-40005
> URL: https://issues.apache.org/jira/browse/FLINK-40005
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.6.0
> Reporter: Jiyong Wang
> Priority: Major
> Labels: pull-request-available
>
> h2. Flink CDC version
> * Flink CDC 3.6.0 ({{flink-cdc-dist-3.6.0-1.20.jar}})
> * Flink 1.20.x
> * Deployment: Application mode via Flink Kubernetes Operator
> (FlinkDeployment), checkpoint/HA on S3
> h2. What happened
> Submitting a YAML pipeline in *application mode* fails to start. The
> FlinkDeployment job is configured as:
> {code:yaml}
> job:
> jarURI: local:///opt/flink/lib/flink-cdc-dist-3.6.0-1.20.jar
> entryClass: org.apache.flink.cdc.cli.CliExecutor
> args:
> - /flink-cdc-resolved/pipeline.yaml # local path to the pipeline
> definition file
> {code}
> JobManager fails during job graph construction with:
> {code}
> Missing required field "source" in top-level configuration.
> {code}
> The job never runs. The same {{pipeline.yaml}} submitted via session mode
> ({{flink-cdc.sh}} / {{CliFrontend}}) works fine.
> h2. Root cause
> {{CliExecutor.main(String[] args)}} (the application-mode entry point) does:
> {code:java}
> PipelineDef pipelineDef = pipelineDefinitionParser.parse(args[0], new
> Configuration());
> {code}
> {{args[0]}} is the pipeline definition *file path* (the deployment executors
> set {{ApplicationConfiguration.APPLICATION_ARGS = commandLine.getArgList()}},
> i.e. the file path shipped into the JobManager container). But
> {{YamlPipelineDefinitionParser.parse(String, Configuration)}} treats its
> {{String}} argument as the YAML *content*, not a path. So the path string
> itself ({{/flink-cdc-resolved/pipeline.yaml}}) is parsed as YAML, producing a
> document with no {{source}}/{{sink}} keys, which then fails validation with
> _Missing required field "source"_.
> Session mode works because {{CliFrontend}} reads the file into content before
> parsing.
> h2. Expected behavior
> In application mode {{CliExecutor}} should read the pipeline definition file
> referenced by {{args[0]}} and parse its *content*.
> h2. Suggested fix
> Read the file content with the *local JVM file API* (not Flink's
> {{FileSystem}}) and pass the content to the {{String}} (content) overload.
> Using Flink {{FileSystem.get(localPath)}} is unreliable here because the
> cluster default FileSystem is often S3 (checkpoints/HA on S3), so it would
> not resolve the local file shipped next to the JM.
> {code:java}
> // flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java
> (main)
> String pipelineDefContent =
> new String(Files.readAllBytes(Paths.get(args[0])),
> StandardCharsets.UTF_8);
> PipelineDef pipelineDef =
> pipelineDefinitionParser.parse(pipelineDefContent, new
> Configuration());
> {code}
> h2. How to reproduce
> # Build a Flink 1.20 image with {{flink-cdc-dist-3.6.0-1.20.jar}} in
> {{/opt/flink/lib}}.
> # Deploy a FlinkDeployment (application mode) with
> {{entryClass=org.apache.flink.cdc.cli.CliExecutor}} and
> {{args=[/path/to/pipeline.yaml]}}, the pipeline file mounted/shipped into the
> JM container.
> # JM fails with {{Missing required field "source" in top-level
> configuration}}.
> h2. Affected component
> {{flink-cdc-cli}} —— {{org.apache.flink.cdc.cli.CliExecutor#main}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)