This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-webui.git


The following commit(s) were added to refs/heads/main by this push:
     new f81a3e1b [Feature] Introduce web socket for job submission (#440)
f81a3e1b is described below

commit f81a3e1b0fdb7054379bd318d0d5f105c0359b86
Author: s7monk <[email protected]>
AuthorDate: Mon Jun 24 18:55:27 2024 +0800

    [Feature] Introduce web socket for job submission (#440)
---
 paimon-web-server/pom.xml                          |  5 ++
 .../web/server/configrue/WebSocketConfig.java      | 66 ++++++++++++++++++++++
 .../paimon/web/server/data/dto/JobSubmitDTO.java   |  2 +
 .../paimon/web/server/data/model/JobInfo.java      |  2 +
 .../apache/paimon/web/server/data/vo/JobVO.java    |  2 +
 .../web/server/service/impl/JobServiceImpl.java    |  9 +++
 .../src/main/resources/application.yml             |  7 ++-
 .../web/server/controller/JobControllerTest.java   |  4 +-
 scripts/sql/paimon-mysql.sql                       |  1 +
 9 files changed, 95 insertions(+), 3 deletions(-)

diff --git a/paimon-web-server/pom.xml b/paimon-web-server/pom.xml
index 72c092dd..ac352377 100644
--- a/paimon-web-server/pom.xml
+++ b/paimon-web-server/pom.xml
@@ -157,6 +157,11 @@ under the License.
             <artifactId>spring-boot-starter-data-ldap</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>com.baomidou</groupId>
             <artifactId>mybatis-plus-boot-starter</artifactId>
diff --git 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/configrue/WebSocketConfig.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/configrue/WebSocketConfig.java
new file mode 100644
index 00000000..5f6c7428
--- /dev/null
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/configrue/WebSocketConfig.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.server.configrue;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.messaging.simp.config.MessageBrokerRegistry;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+import 
org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
+import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
+import 
org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
+
+/** Websocket config. */
+@Configuration
+@EnableWebSocketMessageBroker
+public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
+
+    @Value("${websocket.pool.size}")
+    private int poolSize;
+
+    @Value("${websocket.heartbeat.send}")
+    private long sendHeartbeat;
+
+    @Value("${websocket.heartbeat.receive}")
+    private long receiveHeartbeat;
+
+    @Bean
+    public TaskScheduler webSocketTaskScheduler() {
+        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
+        scheduler.setPoolSize(poolSize);
+        scheduler.setThreadNamePrefix("ws-heartbeat-thread-");
+        scheduler.initialize();
+        return scheduler;
+    }
+
+    @Override
+    public void registerStompEndpoints(StompEndpointRegistry registry) {
+        registry.addEndpoint("/ws").setAllowedOrigins("*");
+    }
+
+    @Override
+    public void configureMessageBroker(MessageBrokerRegistry registry) {
+        registry.enableSimpleBroker("/topic")
+                .setHeartbeatValue(new long[] {sendHeartbeat, 
receiveHeartbeat})
+                .setTaskScheduler(webSocketTaskScheduler());
+        registry.setApplicationDestinationPrefixes("/app");
+    }
+}
diff --git 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java
index a8cce8b8..83e4c384 100644
--- 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java
@@ -28,6 +28,8 @@ public class JobSubmitDTO {
 
     private String jobName;
 
+    private String fileName;
+
     private String taskType;
 
     private boolean isStreaming;
diff --git 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/JobInfo.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/JobInfo.java
index ea05d14f..228045ad 100644
--- 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/JobInfo.java
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/JobInfo.java
@@ -40,6 +40,8 @@ public class JobInfo extends BaseModel {
 
     private String jobName;
 
+    private String fileName;
+
     private String type;
 
     private String executeMode;
diff --git 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/vo/JobVO.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/vo/JobVO.java
index 5e22b967..cb3277d7 100644
--- 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/vo/JobVO.java
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/vo/JobVO.java
@@ -40,6 +40,8 @@ public class JobVO {
 
     private String jobName;
 
+    private String fileName;
+
     private String type;
 
     private String executeMode;
diff --git 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
index 6241db3a..917a44f4 100644
--- 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
@@ -61,6 +61,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.messaging.simp.SimpMessagingTemplate;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
@@ -96,6 +97,8 @@ public class JobServiceImpl extends ServiceImpl<JobMapper, 
JobInfo> implements J
 
     @Autowired private HistoryService historyService;
 
+    @Autowired private SimpMessagingTemplate messagingTemplate;
+
     @Override
     public JobVO submitJob(JobSubmitDTO jobSubmitDTO) {
         String pipelineName = getPipelineName(jobSubmitDTO.getStatements());
@@ -146,6 +149,8 @@ public class JobServiceImpl extends ServiceImpl<JobMapper, 
JobInfo> implements J
             logWriter.finish(
                     String.format(
                             "Execution successful [Job ID: %s].", 
executionResult.getJobId()));
+            messagingTemplate.convertAndSend(
+                    "/topic/job", buildJobVO(executionResult, jobSubmitDTO));
             historyService.saveHistory(
                     History.builder()
                             
.name(LocalDateTimeUtil.getFormattedDateTime(LocalDateTime.now()))
@@ -324,6 +329,8 @@ public class JobServiceImpl extends ServiceImpl<JobMapper, 
JobInfo> implements J
                         if (job != null && job.getUid().equals(userId)) {
                             String currentStatus = job.getStatus();
                             if (!jobStatus.equals(currentStatus)) {
+                                job.setStatus(jobStatus);
+                                
messagingTemplate.convertAndSend("/topic/jobStatus", job);
                                 if 
(JobStatus.RUNNING.getValue().equals(jobStatus)) {
                                     updateJobStatusAndStartTime(jobId, 
jobStatus, startTime);
                                 } else if 
(JobStatus.FINISHED.getValue().equals(jobStatus)
@@ -426,6 +433,7 @@ public class JobServiceImpl extends ServiceImpl<JobMapper, 
JobInfo> implements J
                         .type(jobSubmitDTO.getTaskType())
                         .statements(jobSubmitDTO.getStatements())
                         .status(JobStatus.CREATED.getValue())
+                        .fileName(jobSubmitDTO.getFileName())
                         .clusterId(jobSubmitDTO.getClusterId());
 
         String jobName = jobSubmitDTO.getJobName() != null ? 
jobSubmitDTO.getJobName() : "";
@@ -466,6 +474,7 @@ public class JobServiceImpl extends ServiceImpl<JobMapper, 
JobInfo> implements J
                         .resultData(executionResult.getData())
                         .clusterId(jobSubmitDTO.getClusterId())
                         .jobName(jobSubmitDTO.getJobName())
+                        .fileName(jobSubmitDTO.getFileName())
                         .token(0L);
         String executeMode = jobSubmitDTO.isStreaming() ? STREAMING_MODE : 
BATCH_MODE;
         builder.executeMode(executeMode);
diff --git a/paimon-web-server/src/main/resources/application.yml 
b/paimon-web-server/src/main/resources/application.yml
index c0128576..96fe26c9 100644
--- a/paimon-web-server/src/main/resources/application.yml
+++ b/paimon-web-server/src/main/resources/application.yml
@@ -83,4 +83,9 @@ interceptor:
     path:
       patterns: /api/login, /ui/**
 
-
+websocket:
+  pool:
+    size: 1
+  heartbeat:
+    send: 30000
+    receive: 30000
diff --git 
a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/JobControllerTest.java
 
b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/JobControllerTest.java
index c6a6a403..4f371f98 100644
--- 
a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/JobControllerTest.java
+++ 
b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/JobControllerTest.java
@@ -390,8 +390,8 @@ public class JobControllerTest extends 
FlinkSQLGatewayTestBase {
                 ObjectMapperUtils.fromJSON(
                         getJobStatisticsResponseStr, new 
TypeReference<R<JobStatisticsVO>>() {});
         assertEquals(200, getJobStatisticsRes.getCode());
-        assertEquals(6, getJobStatisticsRes.getData().getTotalNum());
-        assertEquals(5, getJobStatisticsRes.getData().getRunningNum());
+        assertEquals(7, getJobStatisticsRes.getData().getTotalNum());
+        assertEquals(6, getJobStatisticsRes.getData().getRunningNum());
         assertEquals(1, getJobStatisticsRes.getData().getCanceledNum());
         assertEquals(0, getJobStatisticsRes.getData().getFinishedNum());
         assertEquals(0, getJobStatisticsRes.getData().getFailedNum());
diff --git a/scripts/sql/paimon-mysql.sql b/scripts/sql/paimon-mysql.sql
index b7b51415..b35e79ab 100644
--- a/scripts/sql/paimon-mysql.sql
+++ b/scripts/sql/paimon-mysql.sql
@@ -160,6 +160,7 @@ CREATE TABLE if not exists `job`
     `id`          int(11)     not null auto_increment primary key comment 'id',
     `job_id`     varchar(100)  not null comment 'job id',
     `job_name`     varchar(200) comment 'job name',
+    `file_name`     varchar(200) comment 'file name',
     `type`     varchar(100)  comment 'job type',
     `execute_mode`     varchar(50)  comment 'execute mode',
     `cluster_id`     varchar(100)  comment 'cluster id',

Reply via email to