This is an automated email from the ASF dual-hosted git repository.
xincheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new a7245189a8 [Improvement][Flink] Change Flink command into FLINK_HOME
(#16004)
a7245189a8 is described below
commit a7245189a832a5eeda6d3fbe203dab431926fb5a
Author: JohnHuang <[email protected]>
AuthorDate: Wed May 29 13:32:31 2024 +0800
[Improvement][Flink] Change Flink command into FLINK_HOME (#16004)
---
.../plugin/task/flink/FlinkArgsUtilsTest.java | 13 +++++++------
.../dolphinscheduler/plugin/task/flink/FlinkConstants.java | 4 ++--
.../plugin/task/flink/FlinkArgsUtilsTest.java | 13 +++++++------
3 files changed, 16 insertions(+), 14 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
index 6c2c4c6100..1d116b8b8e 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
@@ -69,7 +69,7 @@ public class FlinkArgsUtilsTest {
List<String> commandLine =
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
Assertions.assertEquals(
- "flink run-application -t yarn-application -ys 4 -ynm
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+ "${FLINK_HOME}/bin/flink run-application -t yarn-application
-ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main
/opt/job.jar",
joinStringListWithSpace(commandLine));
}
@@ -81,7 +81,7 @@ public class FlinkArgsUtilsTest {
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
Assertions.assertEquals(
- "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+ "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine1));
flinkParameters.setFlinkVersion("<1.10");
@@ -89,7 +89,7 @@ public class FlinkArgsUtilsTest {
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
Assertions.assertEquals(
- "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+ "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine2));
flinkParameters.setFlinkVersion(">=1.12");
@@ -97,7 +97,7 @@ public class FlinkArgsUtilsTest {
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
Assertions.assertEquals(
- "flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+ "${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine3));
}
@@ -107,7 +107,7 @@ public class FlinkArgsUtilsTest {
List<String> commandLine =
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
Assertions.assertEquals(
- "flink run -p 4 -sae -c org.example.Main /opt/job.jar",
+ "${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main
/opt/job.jar",
joinStringListWithSpace(commandLine));
}
@@ -117,7 +117,8 @@ public class FlinkArgsUtilsTest {
flinkParameters.setProgramType(ProgramType.SQL);
List<String> commandLine =
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
- Assertions.assertEquals("sql-client.sh -i
/tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
+ Assertions.assertEquals(
+ "${FLINK_HOME}/bin/sql-client.sh -i
/tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
joinStringListWithSpace(commandLine));
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
index b2d7607761..c12c0bfce3 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
@@ -27,14 +27,14 @@ public class FlinkConstants {
* flink command
* usage: flink run [OPTIONS] <jar-file> <arguments>
*/
- public static final String FLINK_COMMAND = "flink";
+ public static final String FLINK_COMMAND = "${FLINK_HOME}/bin/flink";
public static final String FLINK_RUN = "run";
/**
* flink sql command
* usage: sql-client.sh -i <initialization file>, -f <script file>
*/
- public static final String FLINK_SQL_COMMAND = "sql-client.sh";
+ public static final String FLINK_SQL_COMMAND =
"${FLINK_HOME}/bin/sql-client.sh";
/**
* flink run options
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
index 4ef1ad7091..b53260bb87 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
@@ -69,7 +69,7 @@ public class FlinkArgsUtilsTest {
List<String> commandLine =
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
Assertions.assertEquals(
- "flink run-application -t yarn-application -ys 4 -ynm
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+ "${FLINK_HOME}/bin/flink run-application -t yarn-application
-ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main
/opt/job.jar",
joinStringListWithSpace(commandLine));
}
@@ -81,7 +81,7 @@ public class FlinkArgsUtilsTest {
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
Assertions.assertEquals(
- "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+ "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine1));
flinkParameters.setFlinkVersion("<1.10");
@@ -89,7 +89,7 @@ public class FlinkArgsUtilsTest {
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
Assertions.assertEquals(
- "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+ "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine2));
flinkParameters.setFlinkVersion(">=1.12");
@@ -97,7 +97,7 @@ public class FlinkArgsUtilsTest {
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
Assertions.assertEquals(
- "flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+ "${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
joinStringListWithSpace(commandLine3));
}
@@ -107,7 +107,7 @@ public class FlinkArgsUtilsTest {
List<String> commandLine =
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
Assertions.assertEquals(
- "flink run -p 4 -sae -c org.example.Main /opt/job.jar",
+ "${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main
/opt/job.jar",
joinStringListWithSpace(commandLine));
}
@@ -117,7 +117,8 @@ public class FlinkArgsUtilsTest {
flinkParameters.setProgramType(ProgramType.SQL);
List<String> commandLine =
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(),
flinkParameters);
- Assertions.assertEquals("sql-client.sh -i
/tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
+ Assertions.assertEquals(
+ "${FLINK_HOME}/bin/sql-client.sh -i
/tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
joinStringListWithSpace(commandLine));
}