yuxiqian commented on code in PR #3823:
URL: https://github.com/apache/flink-cdc/pull/3823#discussion_r1903736412
##########
flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java:
##########
@@ -145,6 +146,31 @@ void testPipelineExecuting() throws Exception {
assertThat(executionInfo.getDescription()).isEqualTo("fake-description");
}
+ @Test
+ void testPipelineExecutingWithFlinkConfig() throws Exception {
+ CliExecutor executor =
+ createExecutor(
+ pipelineDef(),
+ "--flink-home",
+ flinkHome(),
+ "--global-config",
+ globalPipelineConfig(),
+ "--flink-conf",
+ "execution.target=yarn-session",
+ "--flink-conf",
+ "rest.bind-port=42689",
+ "-fc",
+ "yarn.application.id=application_1714009558476_3563",
+ "-fc",
+ "rest.bind-address=10.1.140.140");
+ Map<String, String> configMap = executor.getFlinkConfig().toMap();
+
assertThat(configMap.get("execution.target")).isEqualTo("yarn-session");
+ assertThat(configMap.get("rest.bind-port")).isEqualTo("42689");
+ assertThat(configMap.get("yarn.application.id"))
+ .isEqualTo("application_1714009558476_3563");
+
assertThat(configMap.get("rest.bind-address")).isEqualTo("10.1.140.140");
Review Comment:
```suggestion
assertThat(configMap)
.containsEntry("execution.target", "yarn-session")
.containsEntry("rest.bind-port", "42689")
.containsEntry("yarn.application.id",
"application_1714009558476_3563")
.containsEntry("rest.bind-address", "10.1.140.140");
```
##########
flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java:
##########
@@ -145,6 +146,31 @@ void testPipelineExecuting() throws Exception {
assertThat(executionInfo.getDescription()).isEqualTo("fake-description");
}
+ @Test
+ void testPipelineExecutingWithFlinkConfig() throws Exception {
Review Comment:
Please also add tests for malformed arguments (like when `=` is missing, or
`=` is part of value, etc.)
##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java:
##########
@@ -114,6 +120,21 @@ static CliExecutor createExecutor(CommandLine commandLine)
throws Exception {
savepointSettings);
}
+ private static void overrideFlinkConfiguration(
+ Configuration flinkConfig, CommandLine commandLine) {
+ String[] flinkConfigs = commandLine.getOptionValues(FLINK_CONFIG);
+ if (flinkConfigs != null) {
+ LOG.info("Find flink config items: {}", String.join(",",
flinkConfigs));
+ for (String config : flinkConfigs) {
+ String key = config.split("=")[0].trim();
+ String value = config.split("=")[1].trim();
Review Comment:
We can split just once and verify the format first.
##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java:
##########
@@ -114,6 +120,21 @@ static CliExecutor createExecutor(CommandLine commandLine)
throws Exception {
savepointSettings);
}
+ private static void overrideFlinkConfiguration(
+ Configuration flinkConfig, CommandLine commandLine) {
+ String[] flinkConfigs = commandLine.getOptionValues(FLINK_CONFIG);
+ if (flinkConfigs != null) {
+ LOG.info("Find flink config items: {}", String.join(",",
flinkConfigs));
Review Comment:
```suggestion
LOG.info("Dynamic flink config items found: {}", flinkConfigs);
```
##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java:
##########
@@ -94,6 +94,15 @@ public class CliFrontendOptions {
+ "program that was part of the program
when the savepoint was triggered.")
.build();
+ public static final Option FLINK_CONFIG =
+ Option.builder("fc")
Review Comment:
[SQL
Client](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#sql-client-startup-options)
provides `-D` to pass extra Flink options dynamically. Maybe we can follow the
same naming here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]