This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new ef78fddfc [Feature-3934] Add zookeeper registry for streampark (#4127)
ef78fddfc is described below
commit ef78fddfcc9257165353a689317976e20ec7839d
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Wed Oct 30 13:45:48 2024 +0800
[Feature-3934] Add zookeeper registry for streampark (#4127)
* zookeeper registry
* add registry interface comment
* handle InterruptedException
* refactor
---
pom.xml | 7 +
.../src/main/assembly/conf/config.yaml | 12 +-
.../console/StreamParkConsoleBootstrap.java | 1 -
.../console/core/runner/EnvInitializer.java | 17 ++
.../console/core/service/RegistryService.java | 31 ++++
.../core/service/impl/RegistryServiceImpl.java | 206 +++++++++++++++++++++
.../console/core/service/RegistryServiceTest.java | 46 +++++
tools/dependencies/known-dependencies.txt | 4 +-
8 files changed, 317 insertions(+), 7 deletions(-)
diff --git a/pom.xml b/pom.xml
index 0c9fdbc15..4a65812ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
<hadoop.version>3.3.4</hadoop.version>
<hbase.version>2.1.10</hbase.version>
<redis.version>3.3.0</redis.version>
+ <zoopkeeper.version>3.6.3</zoopkeeper.version>
<es.version>6.2.3</es.version>
<influxdb.version>2.17</influxdb.version>
<protobuf.version>2.5.0</protobuf.version>
@@ -242,6 +243,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>${zoopkeeper.version}</version>
+ </dependency>
+
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml
b/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml
index 084e99112..91e843a2b 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml
+++
b/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml
@@ -100,11 +100,13 @@ sso:
# Optional, change by authentication client
# Please replace and fill in your client config below when enabled SSO
-registry:
- # default using jdbc as registry
- type: jdbc
- heartbeat-refresh-interval: 1s
- session-timeout: 3s
+high-availability:
+ enable: false # true
+ # The list of ZooKeeper quorum peers that coordinate the high-availability
+ # setup. This must be a list of the form:
+ # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
+ #
+ zookeeper.quorum: 192.168.100.128:2181,192.168.100.129:2181
network:
# network interface preferred like eth0, default: empty
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
index 601fc7f00..ad1e279e8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
@@ -54,5 +54,4 @@ public class StreamParkConsoleBootstrap {
.sources(StreamParkConsoleBootstrap.class)
.run(args);
}
-
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 4e5ba38ac..ebcaef12f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -26,9 +26,11 @@ import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.SystemPropertyUtils;
+import org.apache.streampark.console.base.util.SpringContextUtils;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.entity.SparkEnv;
+import org.apache.streampark.console.core.service.RegistryService;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.commons.lang3.StringUtils;
@@ -81,6 +83,9 @@ public class EnvInitializer implements ApplicationRunner {
// init InternalConfig
initConfig();
+ // init RegistryService
+ initRegistryService();
+
boolean isTest =
Arrays.asList(context.getEnvironment().getActiveProfiles()).contains("test");
if (!isTest) {
// initialize local file system resources
@@ -110,6 +115,18 @@ public class EnvInitializer implements ApplicationRunner {
overrideSystemProp(ConfigKeys.KEY_HADOOP_USER_NAME(), hadoopUserName);
}
+ private void initRegistryService() {
+ boolean enable = SystemPropertyUtils.get("high-availability.enable",
"false").equals("true");
+ if (enable) {
+ RegistryService registryService =
SpringContextUtils.getBean(RegistryService.class);
+ registryService.registry();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ registryService.unRegister();
+ log.info("RegistryService unRegister success");
+ }));
+ }
+ }
+
private void overrideSystemProp(String key, String defaultValue) {
String value = context.getEnvironment().getProperty(key, defaultValue);
log.info("initialize system properties: key:{}, value:{}", key, value);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
new file mode 100644
index 000000000..d504be67a
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+public interface RegistryService {
+
+ /**
+ * Registry the service.
+ */
+ void registry();
+
+ /**
+ * Close the registry service.
+ */
+ void unRegister();
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
new file mode 100644
index 000000000..f645267d7
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.common.util.SystemPropertyUtils;
+import org.apache.streampark.console.core.service.DistributedTaskService;
+import org.apache.streampark.console.core.service.RegistryService;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.net.InetAddress;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+@Slf4j
+@Service
+public class RegistryServiceImpl implements RegistryService {
+
+ private static final String REGISTRY_PATH = "/services";
+ private static final int HEARTBEAT_INTERVAL = 10000;
+ private static final int HEARTBEAT_TIMEOUT = 60000;
+
+ private String zkAddress;
+ private ZooKeeper zk;
+ private String nodePath;
+ private Watcher watcher = event -> {
+ if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged
+ && event.getPath().equals(REGISTRY_PATH)) {
+ handleNodeChanges();
+ }
+ };
+
+ @Getter
+ private Set<String> currentNodes = new HashSet<>();
+
+ @Autowired
+ private DistributedTaskService distributedTaskService;
+
+ private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(2);
+
+ public void registry() {
+ try {
+ zkAddress =
SystemPropertyUtils.get("high-availability.zookeeper.quorum", "localhost:2181");
+ zk = new ZooKeeper(zkAddress, HEARTBEAT_TIMEOUT, watcher);
+
+ if (zk.exists(REGISTRY_PATH, false) == null) {
+ zk.create(REGISTRY_PATH, new byte[0], OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ }
+
+ String ip = InetAddress.getLocalHost().getHostAddress();
+ String port = SystemPropertyUtils.get("server.port", "10000");
+ String server_id = ip + ":" + port;
+ nodePath = zk.create(REGISTRY_PATH + "/" + server_id, new byte[0],
+ OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+
+ currentNodes.add(nodePath);
+
+ doRegister();
+ } catch (Exception e) {
+ log.error("Failed to init ZooKeeper client", e);
+ }
+ }
+
+ public void doRegister() {
+ try {
+ distributedTaskService.init(currentNodes, nodePath);
+ startHeartbeat();
+ startHeartbeatChecker();
+ handleNodeChanges();
+ log.info("ZooKeeper client started: {}", nodePath);
+ } catch (Exception e) {
+ log.error("Failed to start ZooKeeper client", e);
+ }
+ }
+
+ private void startHeartbeat() {
+ scheduler.scheduleAtFixedRate(() -> {
+ try {
+ zk.setData(nodePath, new byte[0], -1);
+ log.info("Heartbeat updated for node: {}", nodePath);
+ } catch (KeeperException e) {
+ log.info("Zookeeper session expired, attempting to
reconnect...");
+ reconnectAndRegister();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Failed to update heartbeat for node: {}", nodePath,
e);
+ }
+ }, 0, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
+ private void startHeartbeatChecker() {
+ scheduler.scheduleAtFixedRate(() -> {
+ try {
+ long now = System.currentTimeMillis();
+ List<String> servers = zk.getChildren(REGISTRY_PATH, false);
+ for (String server : servers) {
+ String serverPath = REGISTRY_PATH + "/" + server;
+ Stat stat = zk.exists(serverPath, false);
+ if (stat != null && (now - stat.getMtime() >
HEARTBEAT_TIMEOUT)) {
+ zk.delete(serverPath, -1);
+ log.info("Deleted stale node: {}", serverPath);
+ }
+ }
+ } catch (KeeperException e) {
+ log.info("Zookeeper session expired, attempting to
reconnect...");
+ reconnectAndRegister();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Failed to check heartbeat", e);
+ }
+ }, HEARTBEAT_TIMEOUT, HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void handleNodeChanges() {
+ try {
+ List<String> nodes = zk.getChildren(REGISTRY_PATH, true);
+ Set<String> newNodes = new HashSet<>(nodes);
+
+ for (String node : newNodes) {
+ if (!currentNodes.contains(node)) {
+ log.info("Node added: {}", node);
+ distributedTaskService.addServer(node);
+ }
+ }
+
+ for (String node : currentNodes) {
+ if (!newNodes.contains(node)) {
+ log.info("Node removed: {}", node);
+ distributedTaskService.removeServer(node);
+ }
+ }
+
+ currentNodes = newNodes;
+ log.info("Online servers: {}", currentNodes);
+ } catch (KeeperException e) {
+ log.info("Zookeeper session expired, attempting to reconnect...");
+ reconnectAndRegister();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Failed to handle node changes", e);
+ }
+ }
+
+ private void reconnectAndRegister() {
+ int retries = 5;
+ while (retries > 0) {
+ try {
+ zk.close();
+ zk = new ZooKeeper(zkAddress, HEARTBEAT_TIMEOUT, watcher);
+ zk.create(nodePath, new byte[0], OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
+ return;
+ } catch (Exception e) {
+ retries--;
+ log.warn("Retrying connection, attempts left: {}", retries, e);
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ log.error("Failed to reconnect and register node after multiple
attempts.");
+ }
+
+ @Override
+ public void unRegister() {
+ try {
+ zk.close();
+ scheduler.shutdown();
+ log.info("ZooKeeper client closed: {}", nodePath);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Failed to close ZooKeeper client", e);
+ }
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
new file mode 100644
index 000000000..66e3c73a7
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.common.util.SystemPropertyUtils;
+import org.apache.streampark.console.core.service.impl.RegistryServiceImpl;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class RegistryServiceTest {
+
+ private final RegistryServiceImpl registryService = new
RegistryServiceImpl();
+
+ @Test
+ public void testRegister() {
+ if (enableHA()) {
+ try {
+ registryService.registry();
+ } catch (Exception e) {
+ Assertions.assertEquals(1,
registryService.getCurrentNodes().size());
+ registryService.unRegister();
+ }
+ }
+ }
+
+ public boolean enableHA() {
+ return SystemPropertyUtils.get("high-availability.enable",
"false").equals("true");
+ }
+
+}
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index e685de5ad..5bc70347e 100644
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -377,5 +377,7 @@ xml-apis-1.4.01.jar
xnio-api-3.8.7.Final.jar
xnio-nio-3.8.7.Final.jar
xz-1.5.jar
-zookeeper-3.4.14.jar
+zookeeper-3.6.3.jar
icu4j-67.1.jar
+zookeeper-jute-3.6.3.jar
+netty-transport-native-epoll-4.1.91.Final.jar