This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch notify_message in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit e25c778bead39688589980f13abbb5240214a066 Author: benjobs <[email protected]> AuthorDate: Mon Aug 5 17:07:39 2024 +0800 [Improve] notify message improvement --- .../streampark-console-service/pom.xml | 9 +-- .../console/base/config/WebSocketConfig.java | 30 -------- .../console/core/controller/MessageController.java | 4 +- .../console/core/service/MessageService.java | 2 - .../core/service/impl/AppBuildPipeServiceImpl.java | 1 - .../core/service/impl/MessageServiceImpl.java | 7 -- .../console/core/websocket/WebSocketEndpoint.java | 89 ---------------------- .../src/api/system/notify.ts | 4 +- .../default/header/components/notify/index.vue | 3 +- 9 files changed, 8 insertions(+), 141 deletions(-) diff --git a/streampark-console/streampark-console-service/pom.xml b/streampark-console/streampark-console-service/pom.xml index f906b4500..bed74a135 100644 --- a/streampark-console/streampark-console-service/pom.xml +++ b/streampark-console/streampark-console-service/pom.xml @@ -43,9 +43,9 @@ <docker.client.version>3.2.13</docker.client.version> - <commons-compress.version>1.21</commons-compress.version> + <commons-compress.version>1.26.0</commons-compress.version> <javax-mail.version>1.4.7</javax-mail.version> - <jsch.version>0.2.11</jsch.version> + <jsch.version>0.2.18</jsch.version> <shiro.version>1.10.0</shiro.version> <p6spy.version>3.9.1</p6spy.version> <freemarker.version>2.3.32</freemarker.version> @@ -183,11 +183,6 @@ <artifactId>spring-boot-starter-actuator</artifactId> </dependency> - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-websocket</artifactId> - </dependency> - <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-validation</artifactId> diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/WebSocketConfig.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/WebSocketConfig.java deleted file mode 100644 index 8fa4dc53a..000000000 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/WebSocketConfig.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.console.base.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.web.socket.server.standard.ServerEndpointExporter; - -@Configuration -public class WebSocketConfig { - @Bean - public ServerEndpointExporter serverEndpointExporter() { - return new ServerEndpointExporter(); - } -} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/MessageController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/MessageController.java index f13fe254c..da1601175 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/MessageController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/MessageController.java @@ -46,8 +46,8 @@ public class MessageController { return RestResponse.success(pages); } - @PostMapping("delnotice") - public RestResponse delNotice(Long id) { + @PostMapping("clear") + public RestResponse clear(Long id) { return RestResponse.success(messageService.removeById(id)); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/MessageService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/MessageService.java index a26fd8be7..f29291f19 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/MessageService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/MessageService.java @@ -26,7 +26,5 @@ import com.baomidou.mybatisplus.extension.service.IService; public interface MessageService extends IService<Message> { - void push(Message message); - IPage<Message> getUnRead(NoticeType noticeType, RestRequest request); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 289353511..a0868ef10 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -256,7 +256,6 @@ public class AppBuildPipeServiceImpl app.getJobName().concat(" release failed"), Utils.stringifyException(snapshot.error().exception()), NoticeType.EXCEPTION); - messageService.push(message); app.setRelease(ReleaseState.FAILED.get()); app.setOptionState(OptionState.NONE.getValue()); app.setBuild(true); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/MessageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/MessageServiceImpl.java index 497390136..31b83aead 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/MessageServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/MessageServiceImpl.java @@ -23,7 +23,6 @@ import org.apache.streampark.console.core.entity.Message; import org.apache.streampark.console.core.enums.NoticeType; import org.apache.streampark.console.core.mapper.MessageMapper; import org.apache.streampark.console.core.service.MessageService; -import org.apache.streampark.console.core.websocket.WebSocketEndpoint; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; @@ -40,12 +39,6 @@ import org.springframework.transaction.annotation.Transactional; public class MessageServiceImpl extends ServiceImpl<MessageMapper, Message> implements MessageService { - @Override - public void push(Message message) { - save(message); - WebSocketEndpoint.pushNotice(message); - } - @Override public IPage<Message> getUnRead(NoticeType noticeType, RestRequest request) { Page<Message> page = MybatisPager.getPage(request); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/websocket/WebSocketEndpoint.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/websocket/WebSocketEndpoint.java deleted file mode 100644 index 1524a6ca4..000000000 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/websocket/WebSocketEndpoint.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.console.core.websocket; - -import org.apache.streampark.console.core.entity.Message; - -import io.undertow.util.CopyOnWriteMap; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import javax.websocket.OnClose; -import javax.websocket.OnError; -import javax.websocket.OnOpen; -import javax.websocket.Session; -import javax.websocket.server.PathParam; -import javax.websocket.server.ServerEndpoint; - -import java.io.IOException; -import java.util.Map; - -@Slf4j -@Component -@ServerEndpoint(value = "/websocket/{id}") -public class WebSocketEndpoint { - - private static final Map<String, Session> SOCKET_SESSIONS = new CopyOnWriteMap<>(); - - @Getter private String id; - - @Getter private Session session; - - @OnOpen - public void onOpen(Session session, @PathParam("id") String id) { - log.debug("websocket onOpen...."); - this.id = id; - this.session = session; - SOCKET_SESSIONS.put(id, session); - } - - @OnClose - public void onClose() throws IOException { - log.debug("websocket onClose...."); - this.session.close(); - SOCKET_SESSIONS.remove(this.id); - } - - @OnError - public void onError(Session session, Throwable e) { - log.error(e.getMessage(), e); - } - - public static void writeMessage(String socketId, String message) { - try { - Session session = SOCKET_SESSIONS.get(socketId); - if (session != null) { - session.getBasicRemote().sendText(message); - } - } catch (IOException e) { - log.error(e.getMessage(), e); - } - } - - public static void pushNotice(Message message) { - try { - Session session = SOCKET_SESSIONS.get(message.getUserId().toString()); - if (session != null) { - session.getBasicRemote().sendObject(message); - } - } catch (Exception e) { - log.error(e.getMessage(), e); - } - } -} diff --git a/streampark-console/streampark-console-webapp/src/api/system/notify.ts b/streampark-console/streampark-console-webapp/src/api/system/notify.ts index 3ed4e6758..e58dd2368 100644 --- a/streampark-console/streampark-console-webapp/src/api/system/notify.ts +++ b/streampark-console/streampark-console-webapp/src/api/system/notify.ts @@ -27,7 +27,7 @@ interface NotifyParam { } enum NOTIFY_API { NOTICE = '/message/notice', - DEL = '/message/delnotice', + CLEAR = '/message/clear', } /** * Get notification list @@ -45,7 +45,7 @@ export function fetchNotify(data: NotifyParam): Promise<NotifyList> { */ export function fetchNotifyDelete(id: string): Promise<NotifyList> { return defHttp.post({ - url: NOTIFY_API.DEL, + url: NOTIFY_API.CLEAR, data: { id }, }); } diff --git a/streampark-console/streampark-console-webapp/src/layouts/default/header/components/notify/index.vue b/streampark-console/streampark-console-webapp/src/layouts/default/header/components/notify/index.vue index 1bbaa3293..ca016fdee 100644 --- a/streampark-console/streampark-console-webapp/src/layouts/default/header/components/notify/index.vue +++ b/streampark-console/streampark-console-webapp/src/layouts/default/header/components/notify/index.vue @@ -93,7 +93,7 @@ try { notifyLoading.value = true; await fetchNotifyDelete(id); - getNotifyList(unref(notifyType)); + await getNotifyList(unref(notifyType)); } catch (error) { console.error(error); } finally { @@ -135,6 +135,7 @@ }, }, }); + watch([data, currentPage], ([newData]: [NotifyItem], [newPage]) => { if (newData && isObject(newData)) { /* The abnormal alarm */
