This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-webui.git


The following commit(s) were added to refs/heads/main by this push:
     new dafe98fa [Improvement] Fill in the execution mode for the statement 
(#268)
dafe98fa is described below

commit dafe98fa3ed165a8db08d149a9970378920aab51
Author: s7monk <[email protected]>
AuthorDate: Sun Jun 2 00:32:40 2024 +0800

    [Improvement] Fill in the execution mode for the statement (#268)
---
 .../paimon/web/server/service/impl/JobServiceImpl.java  | 14 ++++++++++++++
 .../paimon/web/server/controller/JobControllerTest.java | 17 +++++++++++++++++
 2 files changed, 31 insertions(+)

diff --git 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
index 40fb061c..adeab17e 100644
--- 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
@@ -100,6 +100,16 @@ public class JobServiceImpl extends ServiceImpl<JobMapper, 
JobInfo> implements J
                     addPipelineNameStatement(pipelineName, 
jobSubmitDTO.getStatements()));
         }
 
+        if (jobSubmitDTO.isStreaming()) {
+            jobSubmitDTO.setStatements(
+                    addExecutionModeStatement(
+                            STREAMING_MODE.toLowerCase(), 
jobSubmitDTO.getStatements()));
+        } else {
+            jobSubmitDTO.setStatements(
+                    addExecutionModeStatement(
+                            BATCH_MODE.toLowerCase(), 
jobSubmitDTO.getStatements()));
+        }
+
         Executor executor =
                 this.getExecutor(jobSubmitDTO.getClusterId(), 
jobSubmitDTO.getTaskType());
         if (executor == null) {
@@ -437,6 +447,10 @@ public class JobServiceImpl extends ServiceImpl<JobMapper, 
JobInfo> implements J
         return "SET 'pipeline.name' = '" + pipelineName + "';\n" + statements;
     }
 
+    private String addExecutionModeStatement(String executionMode, String 
statements) {
+        return "SET 'execution.runtime-mode' = '" + executionMode + "';\n" + 
statements;
+    }
+
     private boolean shouldCreateSession(String clusterId) {
         if (StpUtil.isLogin()) {
             SessionEntity session =
diff --git 
a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/JobControllerTest.java
 
b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/JobControllerTest.java
index bac3f468..c943b6c7 100644
--- 
a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/JobControllerTest.java
+++ 
b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/JobControllerTest.java
@@ -24,12 +24,14 @@ import org.apache.paimon.web.server.data.dto.LoginDTO;
 import org.apache.paimon.web.server.data.dto.ResultFetchDTO;
 import org.apache.paimon.web.server.data.dto.StopJobDTO;
 import org.apache.paimon.web.server.data.model.ClusterInfo;
+import org.apache.paimon.web.server.data.model.History;
 import org.apache.paimon.web.server.data.result.R;
 import org.apache.paimon.web.server.data.vo.JobStatisticsVO;
 import org.apache.paimon.web.server.data.vo.JobStatusVO;
 import org.apache.paimon.web.server.data.vo.JobVO;
 import org.apache.paimon.web.server.data.vo.ResultDataVO;
 import org.apache.paimon.web.server.service.ClusterService;
+import org.apache.paimon.web.server.service.HistoryService;
 import org.apache.paimon.web.server.util.ObjectMapperUtils;
 import org.apache.paimon.web.server.util.StringUtils;
 
@@ -79,6 +81,8 @@ public class JobControllerTest extends 
FlinkSQLGatewayTestBase {
 
     @Autowired private ClusterService clusterService;
 
+    @Autowired private HistoryService historyService;
+
     @BeforeEach
     public void before() throws Exception {
         LoginDTO login = new LoginDTO();
@@ -146,6 +150,7 @@ public class JobControllerTest extends 
FlinkSQLGatewayTestBase {
         JobSubmitDTO jobSubmitDTO = new JobSubmitDTO();
         jobSubmitDTO.setJobName("flink-job-test");
         jobSubmitDTO.setTaskType("Flink");
+        jobSubmitDTO.setStreaming(true);
         jobSubmitDTO.setClusterId(String.valueOf(one.getId()));
         jobSubmitDTO.setStatements(StatementsConstant.statement);
 
@@ -168,6 +173,7 @@ public class JobControllerTest extends 
FlinkSQLGatewayTestBase {
         JobSubmitDTO jobSubmitDTO = new JobSubmitDTO();
         jobSubmitDTO.setJobName("flink-job-test-fetch-result");
         jobSubmitDTO.setTaskType("Flink");
+        jobSubmitDTO.setStreaming(true);
         jobSubmitDTO.setClusterId(String.valueOf(one.getId()));
         jobSubmitDTO.setStatements(StatementsConstant.selectStatement);
 
@@ -212,6 +218,7 @@ public class JobControllerTest extends 
FlinkSQLGatewayTestBase {
         JobSubmitDTO jobSubmitDTO = new JobSubmitDTO();
         jobSubmitDTO.setJobName("flink-job-test-list-jobs");
         jobSubmitDTO.setTaskType("Flink");
+        jobSubmitDTO.setStreaming(true);
         jobSubmitDTO.setClusterId(String.valueOf(one.getId()));
         jobSubmitDTO.setStatements(StatementsConstant.selectStatement);
 
@@ -253,6 +260,7 @@ public class JobControllerTest extends 
FlinkSQLGatewayTestBase {
         JobSubmitDTO jobSubmitDTO = new JobSubmitDTO();
         jobSubmitDTO.setJobName("flink-job-test-get-job-status");
         jobSubmitDTO.setTaskType("Flink");
+        jobSubmitDTO.setStreaming(true);
         jobSubmitDTO.setClusterId(String.valueOf(one.getId()));
         jobSubmitDTO.setStatements(StatementsConstant.selectStatement);
 
@@ -278,6 +286,7 @@ public class JobControllerTest extends 
FlinkSQLGatewayTestBase {
         JobSubmitDTO jobSubmitDTO = new JobSubmitDTO();
         jobSubmitDTO.setJobName("flink-job-test-stop-job");
         jobSubmitDTO.setTaskType("Flink");
+        jobSubmitDTO.setStreaming(true);
         jobSubmitDTO.setClusterId(String.valueOf(one.getId()));
         jobSubmitDTO.setStatements(StatementsConstant.selectStatement);
         String responseString = submit(jobSubmitDTO);
@@ -331,6 +340,7 @@ public class JobControllerTest extends 
FlinkSQLGatewayTestBase {
         JobSubmitDTO jobSubmitDTO = new JobSubmitDTO();
         jobSubmitDTO.setJobName("flink-job-test-get-job-statistics");
         jobSubmitDTO.setTaskType("Flink");
+        jobSubmitDTO.setStreaming(true);
         jobSubmitDTO.setClusterId(String.valueOf(one.getId()));
         jobSubmitDTO.setStatements(StatementsConstant.selectStatement);
         String responseString = submit(jobSubmitDTO);
@@ -362,6 +372,13 @@ public class JobControllerTest extends 
FlinkSQLGatewayTestBase {
         assertEquals(0, getJobStatisticsRes.getData().getFailedNum());
     }
 
+    @Test
+    @Order(7)
+    public void testExecutionMode() {
+        History history = historyService.list().get(0);
+        assertTrue(history.getStatements().contains("SET 
'execution.runtime-mode' = 'streaming';"));
+    }
+
     private String submit(JobSubmitDTO jobSubmitDTO) throws Exception {
         return mockMvc.perform(
                         MockMvcRequestBuilders.post(jobPath + "/submit")

Reply via email to