This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-webui.git
The following commit(s) were added to refs/heads/main by this push:
new dafe98fa [Improvement] Fill in the execution mode for the statement
(#268)
dafe98fa is described below
commit dafe98fa3ed165a8db08d149a9970378920aab51
Author: s7monk <[email protected]>
AuthorDate: Sun Jun 2 00:32:40 2024 +0800
[Improvement] Fill in the execution mode for the statement (#268)
---
.../paimon/web/server/service/impl/JobServiceImpl.java | 14 ++++++++++++++
.../paimon/web/server/controller/JobControllerTest.java | 17 +++++++++++++++++
2 files changed, 31 insertions(+)
diff --git
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
index 40fb061c..adeab17e 100644
---
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
@@ -100,6 +100,16 @@ public class JobServiceImpl extends ServiceImpl<JobMapper,
JobInfo> implements J
addPipelineNameStatement(pipelineName,
jobSubmitDTO.getStatements()));
}
+ if (jobSubmitDTO.isStreaming()) {
+ jobSubmitDTO.setStatements(
+ addExecutionModeStatement(
+ STREAMING_MODE.toLowerCase(),
jobSubmitDTO.getStatements()));
+ } else {
+ jobSubmitDTO.setStatements(
+ addExecutionModeStatement(
+ BATCH_MODE.toLowerCase(),
jobSubmitDTO.getStatements()));
+ }
+
Executor executor =
this.getExecutor(jobSubmitDTO.getClusterId(),
jobSubmitDTO.getTaskType());
if (executor == null) {
@@ -437,6 +447,10 @@ public class JobServiceImpl extends ServiceImpl<JobMapper,
JobInfo> implements J
return "SET 'pipeline.name' = '" + pipelineName + "';\n" + statements;
}
+ private String addExecutionModeStatement(String executionMode, String
statements) {
+ return "SET 'execution.runtime-mode' = '" + executionMode + "';\n" +
statements;
+ }
+
private boolean shouldCreateSession(String clusterId) {
if (StpUtil.isLogin()) {
SessionEntity session =
diff --git
a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/JobControllerTest.java
b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/JobControllerTest.java
index bac3f468..c943b6c7 100644
---
a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/JobControllerTest.java
+++
b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/JobControllerTest.java
@@ -24,12 +24,14 @@ import org.apache.paimon.web.server.data.dto.LoginDTO;
import org.apache.paimon.web.server.data.dto.ResultFetchDTO;
import org.apache.paimon.web.server.data.dto.StopJobDTO;
import org.apache.paimon.web.server.data.model.ClusterInfo;
+import org.apache.paimon.web.server.data.model.History;
import org.apache.paimon.web.server.data.result.R;
import org.apache.paimon.web.server.data.vo.JobStatisticsVO;
import org.apache.paimon.web.server.data.vo.JobStatusVO;
import org.apache.paimon.web.server.data.vo.JobVO;
import org.apache.paimon.web.server.data.vo.ResultDataVO;
import org.apache.paimon.web.server.service.ClusterService;
+import org.apache.paimon.web.server.service.HistoryService;
import org.apache.paimon.web.server.util.ObjectMapperUtils;
import org.apache.paimon.web.server.util.StringUtils;
@@ -79,6 +81,8 @@ public class JobControllerTest extends
FlinkSQLGatewayTestBase {
@Autowired private ClusterService clusterService;
+ @Autowired private HistoryService historyService;
+
@BeforeEach
public void before() throws Exception {
LoginDTO login = new LoginDTO();
@@ -146,6 +150,7 @@ public class JobControllerTest extends
FlinkSQLGatewayTestBase {
JobSubmitDTO jobSubmitDTO = new JobSubmitDTO();
jobSubmitDTO.setJobName("flink-job-test");
jobSubmitDTO.setTaskType("Flink");
+ jobSubmitDTO.setStreaming(true);
jobSubmitDTO.setClusterId(String.valueOf(one.getId()));
jobSubmitDTO.setStatements(StatementsConstant.statement);
@@ -168,6 +173,7 @@ public class JobControllerTest extends
FlinkSQLGatewayTestBase {
JobSubmitDTO jobSubmitDTO = new JobSubmitDTO();
jobSubmitDTO.setJobName("flink-job-test-fetch-result");
jobSubmitDTO.setTaskType("Flink");
+ jobSubmitDTO.setStreaming(true);
jobSubmitDTO.setClusterId(String.valueOf(one.getId()));
jobSubmitDTO.setStatements(StatementsConstant.selectStatement);
@@ -212,6 +218,7 @@ public class JobControllerTest extends
FlinkSQLGatewayTestBase {
JobSubmitDTO jobSubmitDTO = new JobSubmitDTO();
jobSubmitDTO.setJobName("flink-job-test-list-jobs");
jobSubmitDTO.setTaskType("Flink");
+ jobSubmitDTO.setStreaming(true);
jobSubmitDTO.setClusterId(String.valueOf(one.getId()));
jobSubmitDTO.setStatements(StatementsConstant.selectStatement);
@@ -253,6 +260,7 @@ public class JobControllerTest extends
FlinkSQLGatewayTestBase {
JobSubmitDTO jobSubmitDTO = new JobSubmitDTO();
jobSubmitDTO.setJobName("flink-job-test-get-job-status");
jobSubmitDTO.setTaskType("Flink");
+ jobSubmitDTO.setStreaming(true);
jobSubmitDTO.setClusterId(String.valueOf(one.getId()));
jobSubmitDTO.setStatements(StatementsConstant.selectStatement);
@@ -278,6 +286,7 @@ public class JobControllerTest extends
FlinkSQLGatewayTestBase {
JobSubmitDTO jobSubmitDTO = new JobSubmitDTO();
jobSubmitDTO.setJobName("flink-job-test-stop-job");
jobSubmitDTO.setTaskType("Flink");
+ jobSubmitDTO.setStreaming(true);
jobSubmitDTO.setClusterId(String.valueOf(one.getId()));
jobSubmitDTO.setStatements(StatementsConstant.selectStatement);
String responseString = submit(jobSubmitDTO);
@@ -331,6 +340,7 @@ public class JobControllerTest extends
FlinkSQLGatewayTestBase {
JobSubmitDTO jobSubmitDTO = new JobSubmitDTO();
jobSubmitDTO.setJobName("flink-job-test-get-job-statistics");
jobSubmitDTO.setTaskType("Flink");
+ jobSubmitDTO.setStreaming(true);
jobSubmitDTO.setClusterId(String.valueOf(one.getId()));
jobSubmitDTO.setStatements(StatementsConstant.selectStatement);
String responseString = submit(jobSubmitDTO);
@@ -362,6 +372,13 @@ public class JobControllerTest extends
FlinkSQLGatewayTestBase {
assertEquals(0, getJobStatisticsRes.getData().getFailedNum());
}
+ @Test
+ @Order(7)
+ public void testExecutionMode() {
+ History history = historyService.list().get(0);
+ assertTrue(history.getStatements().contains("SET
'execution.runtime-mode' = 'streaming';"));
+ }
+
private String submit(JobSubmitDTO jobSubmitDTO) throws Exception {
return mockMvc.perform(
MockMvcRequestBuilders.post(jobPath + "/submit")