This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 390db1ec2 [Improve] Support multi-node deployment (#4080)
390db1ec2 is described below

commit 390db1ec252d7a9032d9bf969860edca87b69ef1
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Tue Sep 24 14:28:20 2024 +0800

    [Improve] Support multi-node deployment (#4080)
    
    * Support multi-node deployment
    
    * rename
---
 .../core/registry/ConsoleRegistryClient.java       | 39 +++++++++++++++++++-
 .../core/registry/ConsoleRegistryDataListener.java |  1 +
 .../core/service/DistributedTaskService.java       | 16 +++++----
 .../service/impl/DistributedTaskServiceImpl.java   | 41 ++++++++++++----------
 .../core/service/DistributedTaskServiceTest.java   | 11 ++++--
 5 files changed, 79 insertions(+), 29 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryClient.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryClient.java
index d04b35104..b9c70be51 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryClient.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryClient.java
@@ -36,6 +36,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.Set;
+
 import static 
org.apache.streampark.common.constants.Constants.SLEEP_TIME_MILLIS;
 
 /**
@@ -83,6 +85,36 @@ public class ConsoleRegistryClient implements AutoCloseable {
         deregister();
     }
 
+    /**
+     * add console node path
+     *
+     * @param path     node path
+     * @param nodeType node type
+     */
+    public void addConsoleNodePath(String path, RegistryNodeType nodeType) {
+        log.info("{} node added : {}", nodeType, path);
+
+        if (StringUtils.isEmpty(path)) {
+            log.error("server start error: empty path: {}, nodeType:{}", path, 
nodeType);
+            return;
+        }
+
+        String serverHost = registryClient.getHostByEventDataPath(path);
+        if (StringUtils.isEmpty(serverHost)) {
+            log.error("server start error: unknown path: {}, nodeType:{}", 
path, nodeType);
+            return;
+        }
+
+        try {
+            if (!registryClient.exists(path)) {
+                log.info("path: {} not exists", path);
+            }
+            distributedTaskService.addServer(serverHost);
+        } catch (Exception e) {
+            log.error("{} server failover failed, host:{}", nodeType, 
serverHost, e);
+        }
+    }
+
     /**
      * remove console node path
      *
@@ -142,7 +174,8 @@ public class ConsoleRegistryClient implements AutoCloseable 
{
         ThreadUtils.sleep(SLEEP_TIME_MILLIS);
 
         consoleHeartBeatTask.start();
-        distributedTaskService.init(consoleConfig.getConsoleAddress());
+        Set<String> allServers = 
getServerNodes(RegistryNodeType.CONSOLE_SERVER);
+        distributedTaskService.init(allServers, 
consoleConfig.getConsoleAddress());
         log.info("Console node : {} registered to registry center 
successfully", consoleConfig.getConsoleAddress());
 
     }
@@ -163,4 +196,8 @@ public class ConsoleRegistryClient implements AutoCloseable 
{
     public boolean isAvailable() {
         return registryClient.isConnected();
     }
+
+    public Set<String> getServerNodes(RegistryNodeType nodeType) {
+        return registryClient.getServerNodeSet(nodeType);
+    }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryDataListener.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryDataListener.java
index 35e143e76..cb40aa072 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryDataListener.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryDataListener.java
@@ -52,6 +52,7 @@ public class ConsoleRegistryDataListener implements 
SubscribeListener {
         switch (event.type()) {
             case ADD:
                 log.info("console node added : {}", path);
+                consoleRegistryClient.addConsoleNodePath(path, 
RegistryNodeType.CONSOLE_SERVER);
                 break;
             case REMOVE:
                 log.info("console node deleted : {}", path);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
index 7b29ef08c..70cf53968 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
@@ -24,6 +24,7 @@ import 
org.apache.streampark.console.core.enums.DistributedTaskEnum;
 import com.baomidou.mybatisplus.extension.service.IService;
 
 import java.util.List;
+import java.util.Set;
 
 /**
  * DistributedTaskService is the interface for managing tasks.
@@ -31,10 +32,11 @@ import java.util.List;
 public interface DistributedTaskService extends IService<DistributedTask> {
 
     /**
-     * Add the current console server itself to the consistent hash ring.
-     * @param serverName String
+     * Initialize the consistent hash ring.
+     * @param allServers All servers
+     * @param serverId The name of the current server
      */
-    void init(String serverName);
+    void init(Set<String> allServers, String serverId);
 
     /**
      * This interface is responsible for polling the database to retrieve task 
records and execute the corresponding operations.
@@ -51,15 +53,15 @@ public interface DistributedTaskService extends 
IService<DistributedTask> {
 
     /**
      * This interface handles task redistribution when server nodes are added.
-     * @param server String
+     * @param serverId String
      */
-    void addServerRedistribute(String server);
+    void addServer(String serverId);
 
     /**
      * This interface handles task redistribution when server nodes are 
removed.
-     * @param server String
+     * @param serverId String
      */
-    void removeServerRedistribute(String server);
+    void removeServer(String serverId);
 
     /**
      * Determine whether the task is processed locally.
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
index 10aa929ec..2af70aed4 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
@@ -40,6 +40,7 @@ import 
org.springframework.transaction.annotation.Transactional;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
@@ -51,8 +52,15 @@ public class DistributedTaskServiceImpl extends 
ServiceImpl<DistributedTaskMappe
     implements
         DistributedTaskService {
 
+    @Qualifier("streamparkDistributedTaskExecutor")
+    @Autowired
+    private Executor taskExecutor;
+
+    @Autowired
+    private ApplicationActionService applicationActionService;
+
     /**
-     * Server name
+     * Server Id
      */
     private String serverId;
 
@@ -61,24 +69,21 @@ public class DistributedTaskServiceImpl extends 
ServiceImpl<DistributedTaskMappe
      */
     private final ConsistentHash<String> consistentHash = new 
ConsistentHash<>(Collections.emptyList());
 
-    @Qualifier("streamparkDistributedTaskExecutor")
-    @Autowired
-    private Executor taskExecutor;
-
-    @Autowired
-    private ApplicationActionService applicationActionService;
-
     /**
      * Task execution status
      */
     private final ConcurrentHashMap<Long, Boolean> runningTasks = new 
ConcurrentHashMap<>();
 
     /**
-     * Add the current console server itself to the consistent hash ring.
+     * Initialize the consistent hash ring.
+     * @param allServers All servers
+     * @param serverId The name of the current server
      */
-    public void init(String serverName) {
-        this.serverId = serverName;
-        consistentHash.add(serverName);
+    public void init(Set<String> allServers, String serverId) {
+        this.serverId = serverId;
+        for (String server : allServers) {
+            consistentHash.add(server);
+        }
     }
 
     @Scheduled(fixedDelay = 50)
@@ -151,21 +156,21 @@ public class DistributedTaskServiceImpl extends 
ServiceImpl<DistributedTaskMappe
     /**
      * This interface handles task redistribution when server nodes are added.
      *
-     * @param server String
+     * @param serverId String
      */
     @Override
-    public void addServerRedistribute(String server) {
-
+    public void addServer(String serverId) {
+        consistentHash.add(serverId);
     }
 
     /**
      * This interface handles task redistribution when server nodes are 
removed.
      *
-     * @param server String
+     * @param serverId String
      */
     @Override
-    public void removeServerRedistribute(String server) {
-
+    public void removeServer(String serverId) {
+        consistentHash.remove(serverId);
     }
 
     /**
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
index 3c7d47837..27c00472f 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
@@ -27,25 +27,30 @@ import com.fasterxml.jackson.core.JacksonException;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
 @Slf4j
 class DistributedTaskServiceTest {
 
     private final DistributedTaskServiceImpl distributionTaskService = new 
DistributedTaskServiceImpl();
 
-    private final String serverName = "testServer";
+    private final String serverId = "testServer";
+    private final Set<String> allServers = new 
HashSet<>(Collections.singleton(serverId));
 
     // the number of virtual nodes for each server
     private final int numberOfReplicas = 2 << 16;
 
     @Test
     void testInit() {
-        distributionTaskService.init(serverName);
+        distributionTaskService.init(allServers, serverId);
         assert (distributionTaskService.getConsistentHashSize() == 
numberOfReplicas);
     }
 
     @Test
     void testIsLocalProcessing() {
-        distributionTaskService.init(serverName);
+        distributionTaskService.init(allServers, serverId);
         for (long i = 0; i < numberOfReplicas; i++) {
             assert (distributionTaskService.isLocalProcessing(i));
         }

Reply via email to