This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-webui.git
The following commit(s) were added to refs/heads/main by this push:
new f81a3e1b [Feature] Introduce web socket for job submission (#440)
f81a3e1b is described below
commit f81a3e1b0fdb7054379bd318d0d5f105c0359b86
Author: s7monk <[email protected]>
AuthorDate: Mon Jun 24 18:55:27 2024 +0800
[Feature] Introduce web socket for job submission (#440)
---
paimon-web-server/pom.xml | 5 ++
.../web/server/configrue/WebSocketConfig.java | 66 ++++++++++++++++++++++
.../paimon/web/server/data/dto/JobSubmitDTO.java | 2 +
.../paimon/web/server/data/model/JobInfo.java | 2 +
.../apache/paimon/web/server/data/vo/JobVO.java | 2 +
.../web/server/service/impl/JobServiceImpl.java | 9 +++
.../src/main/resources/application.yml | 7 ++-
.../web/server/controller/JobControllerTest.java | 4 +-
scripts/sql/paimon-mysql.sql | 1 +
9 files changed, 95 insertions(+), 3 deletions(-)
diff --git a/paimon-web-server/pom.xml b/paimon-web-server/pom.xml
index 72c092dd..ac352377 100644
--- a/paimon-web-server/pom.xml
+++ b/paimon-web-server/pom.xml
@@ -157,6 +157,11 @@ under the License.
<artifactId>spring-boot-starter-data-ldap</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-websocket</artifactId>
+ </dependency>
+
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
diff --git
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/configrue/WebSocketConfig.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/configrue/WebSocketConfig.java
new file mode 100644
index 00000000..5f6c7428
--- /dev/null
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/configrue/WebSocketConfig.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.server.configrue;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.messaging.simp.config.MessageBrokerRegistry;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+import
org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
+import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
+import
org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
+
+/** Websocket config. */
+@Configuration
+@EnableWebSocketMessageBroker
+public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
+
+ @Value("${websocket.pool.size}")
+ private int poolSize;
+
+ @Value("${websocket.heartbeat.send}")
+ private long sendHeartbeat;
+
+ @Value("${websocket.heartbeat.receive}")
+ private long receiveHeartbeat;
+
+ @Bean
+ public TaskScheduler webSocketTaskScheduler() {
+ ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
+ scheduler.setPoolSize(poolSize);
+ scheduler.setThreadNamePrefix("ws-heartbeat-thread-");
+ scheduler.initialize();
+ return scheduler;
+ }
+
+ @Override
+ public void registerStompEndpoints(StompEndpointRegistry registry) {
+ registry.addEndpoint("/ws").setAllowedOrigins("*");
+ }
+
+ @Override
+ public void configureMessageBroker(MessageBrokerRegistry registry) {
+ registry.enableSimpleBroker("/topic")
+ .setHeartbeatValue(new long[] {sendHeartbeat,
receiveHeartbeat})
+ .setTaskScheduler(webSocketTaskScheduler());
+ registry.setApplicationDestinationPrefixes("/app");
+ }
+}
diff --git
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java
index a8cce8b8..83e4c384 100644
---
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java
@@ -28,6 +28,8 @@ public class JobSubmitDTO {
private String jobName;
+ private String fileName;
+
private String taskType;
private boolean isStreaming;
diff --git
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/JobInfo.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/JobInfo.java
index ea05d14f..228045ad 100644
---
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/JobInfo.java
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/JobInfo.java
@@ -40,6 +40,8 @@ public class JobInfo extends BaseModel {
private String jobName;
+ private String fileName;
+
private String type;
private String executeMode;
diff --git
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/vo/JobVO.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/vo/JobVO.java
index 5e22b967..cb3277d7 100644
---
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/vo/JobVO.java
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/vo/JobVO.java
@@ -40,6 +40,8 @@ public class JobVO {
private String jobName;
+ private String fileName;
+
private String type;
private String executeMode;
diff --git
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
index 6241db3a..917a44f4 100644
---
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
@@ -61,6 +61,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@@ -96,6 +97,8 @@ public class JobServiceImpl extends ServiceImpl<JobMapper,
JobInfo> implements J
@Autowired private HistoryService historyService;
+ @Autowired private SimpMessagingTemplate messagingTemplate;
+
@Override
public JobVO submitJob(JobSubmitDTO jobSubmitDTO) {
String pipelineName = getPipelineName(jobSubmitDTO.getStatements());
@@ -146,6 +149,8 @@ public class JobServiceImpl extends ServiceImpl<JobMapper,
JobInfo> implements J
logWriter.finish(
String.format(
"Execution successful [Job ID: %s].",
executionResult.getJobId()));
+ messagingTemplate.convertAndSend(
+ "/topic/job", buildJobVO(executionResult, jobSubmitDTO));
historyService.saveHistory(
History.builder()
.name(LocalDateTimeUtil.getFormattedDateTime(LocalDateTime.now()))
@@ -324,6 +329,8 @@ public class JobServiceImpl extends ServiceImpl<JobMapper,
JobInfo> implements J
if (job != null && job.getUid().equals(userId)) {
String currentStatus = job.getStatus();
if (!jobStatus.equals(currentStatus)) {
+ job.setStatus(jobStatus);
+
messagingTemplate.convertAndSend("/topic/jobStatus", job);
if
(JobStatus.RUNNING.getValue().equals(jobStatus)) {
updateJobStatusAndStartTime(jobId,
jobStatus, startTime);
} else if
(JobStatus.FINISHED.getValue().equals(jobStatus)
@@ -426,6 +433,7 @@ public class JobServiceImpl extends ServiceImpl<JobMapper,
JobInfo> implements J
.type(jobSubmitDTO.getTaskType())
.statements(jobSubmitDTO.getStatements())
.status(JobStatus.CREATED.getValue())
+ .fileName(jobSubmitDTO.getFileName())
.clusterId(jobSubmitDTO.getClusterId());
String jobName = jobSubmitDTO.getJobName() != null ?
jobSubmitDTO.getJobName() : "";
@@ -466,6 +474,7 @@ public class JobServiceImpl extends ServiceImpl<JobMapper,
JobInfo> implements J
.resultData(executionResult.getData())
.clusterId(jobSubmitDTO.getClusterId())
.jobName(jobSubmitDTO.getJobName())
+ .fileName(jobSubmitDTO.getFileName())
.token(0L);
String executeMode = jobSubmitDTO.isStreaming() ? STREAMING_MODE :
BATCH_MODE;
builder.executeMode(executeMode);
diff --git a/paimon-web-server/src/main/resources/application.yml
b/paimon-web-server/src/main/resources/application.yml
index c0128576..96fe26c9 100644
--- a/paimon-web-server/src/main/resources/application.yml
+++ b/paimon-web-server/src/main/resources/application.yml
@@ -83,4 +83,9 @@ interceptor:
path:
patterns: /api/login, /ui/**
-
+websocket:
+ pool:
+ size: 1
+ heartbeat:
+ send: 30000
+ receive: 30000
diff --git
a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/JobControllerTest.java
b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/JobControllerTest.java
index c6a6a403..4f371f98 100644
---
a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/JobControllerTest.java
+++
b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/JobControllerTest.java
@@ -390,8 +390,8 @@ public class JobControllerTest extends
FlinkSQLGatewayTestBase {
ObjectMapperUtils.fromJSON(
getJobStatisticsResponseStr, new
TypeReference<R<JobStatisticsVO>>() {});
assertEquals(200, getJobStatisticsRes.getCode());
- assertEquals(6, getJobStatisticsRes.getData().getTotalNum());
- assertEquals(5, getJobStatisticsRes.getData().getRunningNum());
+ assertEquals(7, getJobStatisticsRes.getData().getTotalNum());
+ assertEquals(6, getJobStatisticsRes.getData().getRunningNum());
assertEquals(1, getJobStatisticsRes.getData().getCanceledNum());
assertEquals(0, getJobStatisticsRes.getData().getFinishedNum());
assertEquals(0, getJobStatisticsRes.getData().getFailedNum());
diff --git a/scripts/sql/paimon-mysql.sql b/scripts/sql/paimon-mysql.sql
index b7b51415..b35e79ab 100644
--- a/scripts/sql/paimon-mysql.sql
+++ b/scripts/sql/paimon-mysql.sql
@@ -160,6 +160,7 @@ CREATE TABLE if not exists `job`
`id` int(11) not null auto_increment primary key comment 'id',
`job_id` varchar(100) not null comment 'job id',
`job_name` varchar(200) comment 'job name',
+ `file_name` varchar(200) comment 'file name',
`type` varchar(100) comment 'job type',
`execute_mode` varchar(50) comment 'execute mode',
`cluster_id` varchar(100) comment 'cluster id',