This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new ef78fddfc [Feature-3934] Add zookeeper registry for streampark (#4127)
ef78fddfc is described below

commit ef78fddfcc9257165353a689317976e20ec7839d
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Wed Oct 30 13:45:48 2024 +0800

    [Feature-3934] Add zookeeper registry for streampark (#4127)
    
    * zookeeper registry
    
    * add registry interface comment
    
    * handle InterruptedException
    
    * refactor
---
 pom.xml                                            |   7 +
 .../src/main/assembly/conf/config.yaml             |  12 +-
 .../console/StreamParkConsoleBootstrap.java        |   1 -
 .../console/core/runner/EnvInitializer.java        |  17 ++
 .../console/core/service/RegistryService.java      |  31 ++++
 .../core/service/impl/RegistryServiceImpl.java     | 206 +++++++++++++++++++++
 .../console/core/service/RegistryServiceTest.java  |  46 +++++
 tools/dependencies/known-dependencies.txt          |   4 +-
 8 files changed, 317 insertions(+), 7 deletions(-)

diff --git a/pom.xml b/pom.xml
index 0c9fdbc15..4a65812ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
         <hadoop.version>3.3.4</hadoop.version>
         <hbase.version>2.1.10</hbase.version>
         <redis.version>3.3.0</redis.version>
+        <zoopkeeper.version>3.6.3</zoopkeeper.version>
         <es.version>6.2.3</es.version>
         <influxdb.version>2.17</influxdb.version>
         <protobuf.version>2.5.0</protobuf.version>
@@ -242,6 +243,12 @@
                 <scope>test</scope>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.zookeeper</groupId>
+                <artifactId>zookeeper</artifactId>
+                <version>${zoopkeeper.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>redis.clients</groupId>
                 <artifactId>jedis</artifactId>
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml
 
b/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml
index 084e99112..91e843a2b 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml
@@ -100,11 +100,13 @@ sso:
         # Optional, change by authentication client
         # Please replace and fill in your client config below when enabled SSO
 
-registry:
-  # default using jdbc as registry
-  type: jdbc
-  heartbeat-refresh-interval: 1s
-  session-timeout: 3s
+high-availability:
+  enable: false # true
+  # The list of ZooKeeper quorum peers that coordinate the high-availability
+  # setup. This must be a list of the form:
+  # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
+  #
+  zookeeper.quorum: 192.168.100.128:2181,192.168.100.129:2181
 
 network:
   # network interface preferred like eth0, default: empty
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
index 601fc7f00..ad1e279e8 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
@@ -54,5 +54,4 @@ public class StreamParkConsoleBootstrap {
             .sources(StreamParkConsoleBootstrap.class)
             .run(args);
     }
-
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 4e5ba38ac..ebcaef12f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -26,9 +26,11 @@ import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.SystemPropertyUtils;
+import org.apache.streampark.console.base.util.SpringContextUtils;
 import org.apache.streampark.console.base.util.WebUtils;
 import org.apache.streampark.console.core.entity.FlinkEnv;
 import org.apache.streampark.console.core.entity.SparkEnv;
+import org.apache.streampark.console.core.service.RegistryService;
 import org.apache.streampark.console.core.service.SettingService;
 
 import org.apache.commons.lang3.StringUtils;
@@ -81,6 +83,9 @@ public class EnvInitializer implements ApplicationRunner {
         // init InternalConfig
         initConfig();
 
+        // init RegistryService
+        initRegistryService();
+
         boolean isTest = 
Arrays.asList(context.getEnvironment().getActiveProfiles()).contains("test");
         if (!isTest) {
             // initialize local file system resources
@@ -110,6 +115,18 @@ public class EnvInitializer implements ApplicationRunner {
         overrideSystemProp(ConfigKeys.KEY_HADOOP_USER_NAME(), hadoopUserName);
     }
 
+    private void initRegistryService() {
+        boolean enable = SystemPropertyUtils.get("high-availability.enable", 
"false").equals("true");
+        if (enable) {
+            RegistryService registryService = 
SpringContextUtils.getBean(RegistryService.class);
+            registryService.registry();
+            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+                registryService.unRegister();
+                log.info("RegistryService unRegister success");
+            }));
+        }
+    }
+
     private void overrideSystemProp(String key, String defaultValue) {
         String value = context.getEnvironment().getProperty(key, defaultValue);
         log.info("initialize system properties: key:{}, value:{}", key, value);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
new file mode 100644
index 000000000..d504be67a
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+public interface RegistryService {
+
+    /**
+     * Registry the service.
+     */
+    void registry();
+
+    /**
+     * Close the registry service.
+     */
+    void unRegister();
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
new file mode 100644
index 000000000..f645267d7
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.common.util.SystemPropertyUtils;
+import org.apache.streampark.console.core.service.DistributedTaskService;
+import org.apache.streampark.console.core.service.RegistryService;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.net.InetAddress;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+@Slf4j
+@Service
+public class RegistryServiceImpl implements RegistryService {
+
+    private static final String REGISTRY_PATH = "/services";
+    private static final int HEARTBEAT_INTERVAL = 10000;
+    private static final int HEARTBEAT_TIMEOUT = 60000;
+
+    private String zkAddress;
+    private ZooKeeper zk;
+    private String nodePath;
+    private Watcher watcher = event -> {
+        if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged
+            && event.getPath().equals(REGISTRY_PATH)) {
+            handleNodeChanges();
+        }
+    };
+
+    @Getter
+    private Set<String> currentNodes = new HashSet<>();
+
+    @Autowired
+    private DistributedTaskService distributedTaskService;
+
+    private final ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(2);
+
+    public void registry() {
+        try {
+            zkAddress = 
SystemPropertyUtils.get("high-availability.zookeeper.quorum", "localhost:2181");
+            zk = new ZooKeeper(zkAddress, HEARTBEAT_TIMEOUT, watcher);
+
+            if (zk.exists(REGISTRY_PATH, false) == null) {
+                zk.create(REGISTRY_PATH, new byte[0], OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+            }
+
+            String ip = InetAddress.getLocalHost().getHostAddress();
+            String port = SystemPropertyUtils.get("server.port", "10000");
+            String server_id = ip + ":" + port;
+            nodePath = zk.create(REGISTRY_PATH + "/" + server_id, new byte[0],
+                OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+
+            currentNodes.add(nodePath);
+
+            doRegister();
+        } catch (Exception e) {
+            log.error("Failed to init ZooKeeper client", e);
+        }
+    }
+
+    public void doRegister() {
+        try {
+            distributedTaskService.init(currentNodes, nodePath);
+            startHeartbeat();
+            startHeartbeatChecker();
+            handleNodeChanges();
+            log.info("ZooKeeper client started: {}", nodePath);
+        } catch (Exception e) {
+            log.error("Failed to start ZooKeeper client", e);
+        }
+    }
+
+    private void startHeartbeat() {
+        scheduler.scheduleAtFixedRate(() -> {
+            try {
+                zk.setData(nodePath, new byte[0], -1);
+                log.info("Heartbeat updated for node: {}", nodePath);
+            } catch (KeeperException e) {
+                log.info("Zookeeper session expired, attempting to 
reconnect...");
+                reconnectAndRegister();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.error("Failed to update heartbeat for node: {}", nodePath, 
e);
+            }
+        }, 0, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
+    }
+
+    private void startHeartbeatChecker() {
+        scheduler.scheduleAtFixedRate(() -> {
+            try {
+                long now = System.currentTimeMillis();
+                List<String> servers = zk.getChildren(REGISTRY_PATH, false);
+                for (String server : servers) {
+                    String serverPath = REGISTRY_PATH + "/" + server;
+                    Stat stat = zk.exists(serverPath, false);
+                    if (stat != null && (now - stat.getMtime() > 
HEARTBEAT_TIMEOUT)) {
+                        zk.delete(serverPath, -1);
+                        log.info("Deleted stale node: {}", serverPath);
+                    }
+                }
+            } catch (KeeperException e) {
+                log.info("Zookeeper session expired, attempting to 
reconnect...");
+                reconnectAndRegister();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.error("Failed to check heartbeat", e);
+            }
+        }, HEARTBEAT_TIMEOUT, HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void handleNodeChanges() {
+        try {
+            List<String> nodes = zk.getChildren(REGISTRY_PATH, true);
+            Set<String> newNodes = new HashSet<>(nodes);
+
+            for (String node : newNodes) {
+                if (!currentNodes.contains(node)) {
+                    log.info("Node added: {}", node);
+                    distributedTaskService.addServer(node);
+                }
+            }
+
+            for (String node : currentNodes) {
+                if (!newNodes.contains(node)) {
+                    log.info("Node removed: {}", node);
+                    distributedTaskService.removeServer(node);
+                }
+            }
+
+            currentNodes = newNodes;
+            log.info("Online servers: {}", currentNodes);
+        } catch (KeeperException e) {
+            log.info("Zookeeper session expired, attempting to reconnect...");
+            reconnectAndRegister();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.error("Failed to handle node changes", e);
+        }
+    }
+
+    private void reconnectAndRegister() {
+        int retries = 5;
+        while (retries > 0) {
+            try {
+                zk.close();
+                zk = new ZooKeeper(zkAddress, HEARTBEAT_TIMEOUT, watcher);
+                zk.create(nodePath, new byte[0], OPEN_ACL_UNSAFE, 
CreateMode.EPHEMERAL);
+                return;
+            } catch (Exception e) {
+                retries--;
+                log.warn("Retrying connection, attempts left: {}", retries, e);
+                try {
+                    Thread.sleep(3000);
+                } catch (InterruptedException ignored) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+        log.error("Failed to reconnect and register node after multiple 
attempts.");
+    }
+
+    @Override
+    public void unRegister() {
+        try {
+            zk.close();
+            scheduler.shutdown();
+            log.info("ZooKeeper client closed: {}", nodePath);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.error("Failed to close ZooKeeper client", e);
+        }
+    }
+}
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
new file mode 100644
index 000000000..66e3c73a7
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.common.util.SystemPropertyUtils;
+import org.apache.streampark.console.core.service.impl.RegistryServiceImpl;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class RegistryServiceTest {
+
+    private final RegistryServiceImpl registryService = new 
RegistryServiceImpl();
+
+    @Test
+    public void testRegister() {
+        if (enableHA()) {
+            try {
+                registryService.registry();
+            } catch (Exception e) {
+                Assertions.assertEquals(1, 
registryService.getCurrentNodes().size());
+                registryService.unRegister();
+            }
+        }
+    }
+
+    public boolean enableHA() {
+        return SystemPropertyUtils.get("high-availability.enable", 
"false").equals("true");
+    }
+
+}
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index e685de5ad..5bc70347e 100644
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -377,5 +377,7 @@ xml-apis-1.4.01.jar
 xnio-api-3.8.7.Final.jar
 xnio-nio-3.8.7.Final.jar
 xz-1.5.jar
-zookeeper-3.4.14.jar
+zookeeper-3.6.3.jar
 icu4j-67.1.jar
+zookeeper-jute-3.6.3.jar
+netty-transport-native-epoll-4.1.91.Final.jar

Reply via email to