This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 390db1ec2 [Improve] Support multi-node deployment (#4080)
390db1ec2 is described below
commit 390db1ec252d7a9032d9bf969860edca87b69ef1
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Tue Sep 24 14:28:20 2024 +0800
[Improve] Support multi-node deployment (#4080)
* Support multi-node deployment
* rename
---
.../core/registry/ConsoleRegistryClient.java | 39 +++++++++++++++++++-
.../core/registry/ConsoleRegistryDataListener.java | 1 +
.../core/service/DistributedTaskService.java | 16 +++++----
.../service/impl/DistributedTaskServiceImpl.java | 41 ++++++++++++----------
.../core/service/DistributedTaskServiceTest.java | 11 ++++--
5 files changed, 79 insertions(+), 29 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryClient.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryClient.java
index d04b35104..b9c70be51 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryClient.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryClient.java
@@ -36,6 +36,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.Set;
+
import static
org.apache.streampark.common.constants.Constants.SLEEP_TIME_MILLIS;
/**
@@ -83,6 +85,36 @@ public class ConsoleRegistryClient implements AutoCloseable {
deregister();
}
+ /**
+ * add console node path
+ *
+ * @param path node path
+ * @param nodeType node type
+ */
+ public void addConsoleNodePath(String path, RegistryNodeType nodeType) {
+ log.info("{} node added : {}", nodeType, path);
+
+ if (StringUtils.isEmpty(path)) {
+ log.error("server start error: empty path: {}, nodeType:{}", path,
nodeType);
+ return;
+ }
+
+ String serverHost = registryClient.getHostByEventDataPath(path);
+ if (StringUtils.isEmpty(serverHost)) {
+ log.error("server start error: unknown path: {}, nodeType:{}",
path, nodeType);
+ return;
+ }
+
+ try {
+ if (!registryClient.exists(path)) {
+ log.info("path: {} not exists", path);
+ }
+ distributedTaskService.addServer(serverHost);
+ } catch (Exception e) {
+ log.error("{} server failover failed, host:{}", nodeType,
serverHost, e);
+ }
+ }
+
/**
* remove console node path
*
@@ -142,7 +174,8 @@ public class ConsoleRegistryClient implements AutoCloseable
{
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
consoleHeartBeatTask.start();
- distributedTaskService.init(consoleConfig.getConsoleAddress());
+ Set<String> allServers =
getServerNodes(RegistryNodeType.CONSOLE_SERVER);
+ distributedTaskService.init(allServers,
consoleConfig.getConsoleAddress());
log.info("Console node : {} registered to registry center
successfully", consoleConfig.getConsoleAddress());
}
@@ -163,4 +196,8 @@ public class ConsoleRegistryClient implements AutoCloseable
{
public boolean isAvailable() {
return registryClient.isConnected();
}
+
+ public Set<String> getServerNodes(RegistryNodeType nodeType) {
+ return registryClient.getServerNodeSet(nodeType);
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryDataListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryDataListener.java
index 35e143e76..cb40aa072 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryDataListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/registry/ConsoleRegistryDataListener.java
@@ -52,6 +52,7 @@ public class ConsoleRegistryDataListener implements
SubscribeListener {
switch (event.type()) {
case ADD:
log.info("console node added : {}", path);
+ consoleRegistryClient.addConsoleNodePath(path,
RegistryNodeType.CONSOLE_SERVER);
break;
case REMOVE:
log.info("console node deleted : {}", path);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
index 7b29ef08c..70cf53968 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DistributedTaskService.java
@@ -24,6 +24,7 @@ import
org.apache.streampark.console.core.enums.DistributedTaskEnum;
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List;
+import java.util.Set;
/**
* DistributedTaskService is the interface for managing tasks.
@@ -31,10 +32,11 @@ import java.util.List;
public interface DistributedTaskService extends IService<DistributedTask> {
/**
- * Add the current console server itself to the consistent hash ring.
- * @param serverName String
+ * Initialize the consistent hash ring.
+ * @param allServers All servers
+ * @param serverId The name of the current server
*/
- void init(String serverName);
+ void init(Set<String> allServers, String serverId);
/**
* This interface is responsible for polling the database to retrieve task
records and execute the corresponding operations.
@@ -51,15 +53,15 @@ public interface DistributedTaskService extends
IService<DistributedTask> {
/**
* This interface handles task redistribution when server nodes are added.
- * @param server String
+ * @param serverId String
*/
- void addServerRedistribute(String server);
+ void addServer(String serverId);
/**
* This interface handles task redistribution when server nodes are
removed.
- * @param server String
+ * @param serverId String
*/
- void removeServerRedistribute(String server);
+ void removeServer(String serverId);
/**
* Determine whether the task is processed locally.
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
index 10aa929ec..2af70aed4 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DistributedTaskServiceImpl.java
@@ -40,6 +40,7 @@ import
org.springframework.transaction.annotation.Transactional;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
@@ -51,8 +52,15 @@ public class DistributedTaskServiceImpl extends
ServiceImpl<DistributedTaskMappe
implements
DistributedTaskService {
+ @Qualifier("streamparkDistributedTaskExecutor")
+ @Autowired
+ private Executor taskExecutor;
+
+ @Autowired
+ private ApplicationActionService applicationActionService;
+
/**
- * Server name
+ * Server Id
*/
private String serverId;
@@ -61,24 +69,21 @@ public class DistributedTaskServiceImpl extends
ServiceImpl<DistributedTaskMappe
*/
private final ConsistentHash<String> consistentHash = new
ConsistentHash<>(Collections.emptyList());
- @Qualifier("streamparkDistributedTaskExecutor")
- @Autowired
- private Executor taskExecutor;
-
- @Autowired
- private ApplicationActionService applicationActionService;
-
/**
* Task execution status
*/
private final ConcurrentHashMap<Long, Boolean> runningTasks = new
ConcurrentHashMap<>();
/**
- * Add the current console server itself to the consistent hash ring.
+ * Initialize the consistent hash ring.
+ * @param allServers All servers
+ * @param serverId The name of the current server
*/
- public void init(String serverName) {
- this.serverId = serverName;
- consistentHash.add(serverName);
+ public void init(Set<String> allServers, String serverId) {
+ this.serverId = serverId;
+ for (String server : allServers) {
+ consistentHash.add(server);
+ }
}
@Scheduled(fixedDelay = 50)
@@ -151,21 +156,21 @@ public class DistributedTaskServiceImpl extends
ServiceImpl<DistributedTaskMappe
/**
* This interface handles task redistribution when server nodes are added.
*
- * @param server String
+ * @param serverId String
*/
@Override
- public void addServerRedistribute(String server) {
-
+ public void addServer(String serverId) {
+ consistentHash.add(serverId);
}
/**
* This interface handles task redistribution when server nodes are
removed.
*
- * @param server String
+ * @param serverId String
*/
@Override
- public void removeServerRedistribute(String server) {
-
+ public void removeServer(String serverId) {
+ consistentHash.remove(serverId);
}
/**
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
index 3c7d47837..27c00472f 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/DistributedTaskServiceTest.java
@@ -27,25 +27,30 @@ import com.fasterxml.jackson.core.JacksonException;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
@Slf4j
class DistributedTaskServiceTest {
private final DistributedTaskServiceImpl distributionTaskService = new
DistributedTaskServiceImpl();
- private final String serverName = "testServer";
+ private final String serverId = "testServer";
+ private final Set<String> allServers = new
HashSet<>(Collections.singleton(serverId));
// the number of virtual nodes for each server
private final int numberOfReplicas = 2 << 16;
@Test
void testInit() {
- distributionTaskService.init(serverName);
+ distributionTaskService.init(allServers, serverId);
assert (distributionTaskService.getConsistentHashSize() ==
numberOfReplicas);
}
@Test
void testIsLocalProcessing() {
- distributionTaskService.init(serverName);
+ distributionTaskService.init(allServers, serverId);
for (long i = 0; i < numberOfReplicas; i++) {
assert (distributionTaskService.isLocalProcessing(i));
}