This is an automated email from the ASF dual-hosted git repository.

xincheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new a7245189a8 [Improvement][Flink] Change Flink command into FLINK_HOME 
(#16004)
a7245189a8 is described below

commit a7245189a832a5eeda6d3fbe203dab431926fb5a
Author: JohnHuang <[email protected]>
AuthorDate: Wed May 29 13:32:31 2024 +0800

    [Improvement][Flink] Change Flink command into FLINK_HOME (#16004)
---
 .../plugin/task/flink/FlinkArgsUtilsTest.java               | 13 +++++++------
 .../dolphinscheduler/plugin/task/flink/FlinkConstants.java  |  4 ++--
 .../plugin/task/flink/FlinkArgsUtilsTest.java               | 13 +++++++------
 3 files changed, 16 insertions(+), 14 deletions(-)

diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
index 6c2c4c6100..1d116b8b8e 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
@@ -69,7 +69,7 @@ public class FlinkArgsUtilsTest {
         List<String> commandLine = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
         Assertions.assertEquals(
-                "flink run-application -t yarn-application -ys 4 -ynm 
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+                "${FLINK_HOME}/bin/flink run-application -t yarn-application 
-ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main 
/opt/job.jar",
                 joinStringListWithSpace(commandLine));
     }
 
@@ -81,7 +81,7 @@ public class FlinkArgsUtilsTest {
                 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
         Assertions.assertEquals(
-                "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m 
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+                "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm 
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
                 joinStringListWithSpace(commandLine1));
 
         flinkParameters.setFlinkVersion("<1.10");
@@ -89,7 +89,7 @@ public class FlinkArgsUtilsTest {
                 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
         Assertions.assertEquals(
-                "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m 
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+                "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm 
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
                 joinStringListWithSpace(commandLine2));
 
         flinkParameters.setFlinkVersion(">=1.12");
@@ -97,7 +97,7 @@ public class FlinkArgsUtilsTest {
                 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
         Assertions.assertEquals(
-                "flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m 
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+                "${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm 
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
                 joinStringListWithSpace(commandLine3));
     }
 
@@ -107,7 +107,7 @@ public class FlinkArgsUtilsTest {
         List<String> commandLine = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
         Assertions.assertEquals(
-                "flink run -p 4 -sae -c org.example.Main /opt/job.jar",
+                "${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main 
/opt/job.jar",
                 joinStringListWithSpace(commandLine));
     }
 
@@ -117,7 +117,8 @@ public class FlinkArgsUtilsTest {
         flinkParameters.setProgramType(ProgramType.SQL);
         List<String> commandLine = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
-        Assertions.assertEquals("sql-client.sh -i 
/tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
+        Assertions.assertEquals(
+                "${FLINK_HOME}/bin/sql-client.sh -i 
/tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
                 joinStringListWithSpace(commandLine));
     }
 
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
index b2d7607761..c12c0bfce3 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
@@ -27,14 +27,14 @@ public class FlinkConstants {
      * flink command
      * usage: flink run [OPTIONS] <jar-file> <arguments>
      */
-    public static final String FLINK_COMMAND = "flink";
+    public static final String FLINK_COMMAND = "${FLINK_HOME}/bin/flink";
     public static final String FLINK_RUN = "run";
 
     /**
      * flink sql command
      * usage: sql-client.sh -i <initialization file>, -f <script file>
      */
-    public static final String FLINK_SQL_COMMAND = "sql-client.sh";
+    public static final String FLINK_SQL_COMMAND = 
"${FLINK_HOME}/bin/sql-client.sh";
 
     /**
      * flink run options
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
index 4ef1ad7091..b53260bb87 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
@@ -69,7 +69,7 @@ public class FlinkArgsUtilsTest {
         List<String> commandLine = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
         Assertions.assertEquals(
-                "flink run-application -t yarn-application -ys 4 -ynm 
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+                "${FLINK_HOME}/bin/flink run-application -t yarn-application 
-ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main 
/opt/job.jar",
                 joinStringListWithSpace(commandLine));
     }
 
@@ -81,7 +81,7 @@ public class FlinkArgsUtilsTest {
                 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
         Assertions.assertEquals(
-                "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m 
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+                "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm 
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
                 joinStringListWithSpace(commandLine1));
 
         flinkParameters.setFlinkVersion("<1.10");
@@ -89,7 +89,7 @@ public class FlinkArgsUtilsTest {
                 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
         Assertions.assertEquals(
-                "flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m 
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+                "${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm 
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
                 joinStringListWithSpace(commandLine2));
 
         flinkParameters.setFlinkVersion(">=1.12");
@@ -97,7 +97,7 @@ public class FlinkArgsUtilsTest {
                 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
         Assertions.assertEquals(
-                "flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m 
-ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
+                "${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm 
demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
                 joinStringListWithSpace(commandLine3));
     }
 
@@ -107,7 +107,7 @@ public class FlinkArgsUtilsTest {
         List<String> commandLine = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
         Assertions.assertEquals(
-                "flink run -p 4 -sae -c org.example.Main /opt/job.jar",
+                "${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main 
/opt/job.jar",
                 joinStringListWithSpace(commandLine));
     }
 
@@ -117,7 +117,8 @@ public class FlinkArgsUtilsTest {
         flinkParameters.setProgramType(ProgramType.SQL);
         List<String> commandLine = 
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), 
flinkParameters);
 
-        Assertions.assertEquals("sql-client.sh -i 
/tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
+        Assertions.assertEquals(
+                "${FLINK_HOME}/bin/sql-client.sh -i 
/tmp/execution/app-id_init.sql -f /tmp/execution/app-id_node.sql",
                 joinStringListWithSpace(commandLine));
     }
 

Reply via email to