Jiyong Wang created FLINK-40005:
-----------------------------------
Summary: [cdc-cli] CliExecutor in application mode parses the
pipeline definition file path as YAML content -> "Missing required field
'source'"
Key: FLINK-40005
URL: https://issues.apache.org/jira/browse/FLINK-40005
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: cdc-3.6.0
Reporter: Jiyong Wang
h2. Flink CDC version
* Flink CDC 3.6.0 ({{flink-cdc-dist-3.6.0-1.20.jar}})
* Flink 1.20.x
* Deployment: Application mode via Flink Kubernetes Operator (FlinkDeployment),
checkpoint/HA on S3
h2. What happened
Submitting a YAML pipeline in *application mode* fails to start. The
FlinkDeployment job is configured as:
{code:yaml}
job:
jarURI: local:///opt/flink/lib/flink-cdc-dist-3.6.0-1.20.jar
entryClass: org.apache.flink.cdc.cli.CliExecutor
args:
- /flink-cdc-resolved/pipeline.yaml # local path to the pipeline
definition file
{code}
JobManager fails during job graph construction with:
{code}
Missing required field "source" in top-level configuration.
{code}
The job never runs. The same {{pipeline.yaml}} submitted via session mode
({{flink-cdc.sh}} / {{CliFrontend}}) works fine.
h2. Root cause
{{CliExecutor.main(String[] args)}} (the application-mode entry point) does:
{code:java}
PipelineDef pipelineDef = pipelineDefinitionParser.parse(args[0], new
Configuration());
{code}
{{args[0]}} is the pipeline definition *file path* (the deployment executors
set {{ApplicationConfiguration.APPLICATION_ARGS = commandLine.getArgList()}},
i.e. the file path shipped into the JobManager container). But
{{YamlPipelineDefinitionParser.parse(String, Configuration)}} treats its
{{String}} argument as the YAML *content*, not a path. So the path string
itself ({{/flink-cdc-resolved/pipeline.yaml}}) is parsed as YAML, producing a
document with no {{source}}/{{sink}} keys, which then fails validation with
_Missing required field "source"_.
Session mode works because {{CliFrontend}} reads the file into content before
parsing.
h2. Expected behavior
In application mode {{CliExecutor}} should read the pipeline definition file
referenced by {{args[0]}} and parse its *content*.
h2. Suggested fix
Read the file content with the *local JVM file API* (not Flink's
{{FileSystem}}) and pass the content to the {{String}} (content) overload.
Using Flink {{FileSystem.get(localPath)}} is unreliable here because the
cluster default FileSystem is often S3 (checkpoints/HA on S3), so it would not
resolve the local file shipped next to the JM.
{code:java}
// flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliExecutor.java (main)
String pipelineDefContent =
new String(Files.readAllBytes(Paths.get(args[0])),
StandardCharsets.UTF_8);
PipelineDef pipelineDef =
pipelineDefinitionParser.parse(pipelineDefContent, new Configuration());
{code}
h2. How to reproduce
# Build a Flink 1.20 image with {{flink-cdc-dist-3.6.0-1.20.jar}} in
{{/opt/flink/lib}}.
# Deploy a FlinkDeployment (application mode) with
{{entryClass=org.apache.flink.cdc.cli.CliExecutor}} and
{{args=[/path/to/pipeline.yaml]}}, the pipeline file mounted/shipped into the
JM container.
# JM fails with {{Missing required field "source" in top-level configuration}}.
h2. Affected component
{{flink-cdc-cli}} —— {{org.apache.flink.cdc.cli.CliExecutor#main}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)