This is an automated email from the ASF dual-hosted git repository.
pfzhan pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new a5b294ebb0 KYLIN-6071 Provide new jdbc service discovery (#2320)
a5b294ebb0 is described below
commit a5b294ebb0d19462053c949177fbc0e374287a8c
Author: Guoliang Sun <[email protected]>
AuthorDate: Thu Feb 27 18:44:29 2025 +0800
KYLIN-6071 Provide new jdbc service discovery (#2320)
Co-authored-by: Yinghao Lin <[email protected]>
---
build/sbin/bootstrap.sh | 10 +-
build/sbin/check-2000-zookeeper-role.sh | 17 +-
.../org/apache/kylin/common/KylinConfigBase.java | 33 +++-
.../apache/kylin/common/util/ClusterConstant.java | 8 +
.../src/main/resources/kylin-defaults0.properties | 4 +
.../apache/kylin/metadata/system/NodeRegistry.java | 172 +++++++++++++++++++++
.../kylin/metadata/system/NodeRegistryManager.java | 135 ++++++++++++++++
.../metadata/system/NodeRegistryManagerTest.java | 64 ++++++++
.../kylin/metadata/system/NodeRegistryTest.java | 148 ++++++++++++++++++
.../apache/kylin/rest/ZookeeperClusterManager.java | 2 +
.../kylin/rest/ZookeeperHostInfoFetcher.java | 2 +
.../apache/kylin/rest/config/ZookeeperConfig.java | 2 +
.../ConditionalOnNodeRegistryJdbcEnabled.java} | 28 ++--
...ConditionalOnNodeRegistryZookeeperEnabled.java} | 28 ++--
.../JdbcClusterManager.java} | 49 +++---
.../rest/discovery/KylinServiceDiscoveryCache.java | 1 +
.../discovery/KylinServiceDiscoveryClient.java | 1 +
.../kylin/rest/discovery/NodeRegistryService.java | 147 ++++++++++++++++++
src/server/src/main/resources/application.yaml | 1 +
.../kylin/rest/config/ZookeeperConfigTest.java | 1 +
.../rest/discovery/JdbcClusterManagerTest.java | 142 +++++++++++++++++
.../rest/discovery/NodeRegistryServiceTest.java | 162 +++++++++++++++++++
.../kylin/engine/spark/builder/ZKHelper.scala | 9 +-
23 files changed, 1090 insertions(+), 76 deletions(-)
diff --git a/build/sbin/bootstrap.sh b/build/sbin/bootstrap.sh
index 770ae4de94..99f6490f07 100755
--- a/build/sbin/bootstrap.sh
+++ b/build/sbin/bootstrap.sh
@@ -144,13 +144,11 @@ function skipCheckOrNot() {
}
function checkZookeeperRole() {
- # this is necessary in FI
- source "${KYLIN_HOME}"/sbin/load-zookeeper-config.sh
- if [[ $(skipCheckOrNot $KYLIN_SKIP_CHECK_MODE) ]]; then
- return 0
+
+ KYLIN_ZOOKEEPER_ENABLED=$("$KYLIN_HOME"/bin/get-properties.sh
kylin.env.zookeeper.enabled)
+ if [[ $KYLIN_ZOOKEEPER_ENABLED == "true" ]]; then
+ source ${KYLIN_HOME}/sbin/check-2000-zookeeper-role.sh
fi
- verboseLog "checking zookeeper role"
- source "${KYLIN_HOME}"/sbin/check-2000-zookeeper-role.sh
}
function checkSparkDir() {
diff --git a/build/sbin/check-2000-zookeeper-role.sh
b/build/sbin/check-2000-zookeeper-role.sh
index 3121c99553..fe57ba741a 100755
--- a/build/sbin/check-2000-zookeeper-role.sh
+++ b/build/sbin/check-2000-zookeeper-role.sh
@@ -21,10 +21,19 @@
source $(cd -P -- "$(dirname -- "$0")" && pwd -P)/header.sh
-echo "Checking Zookeeper role..."
+source ${KYLIN_HOME}/sbin/init-kerberos.sh
+## init Kerberos if needed
+initKerberosIfNeeded
-zk_connect_string=$("${KYLIN_HOME}"/bin/get-properties.sh
kylin.env.zookeeper-connect-string)
+KYLIN_ZOOKEEPER_ENABLED=$("$KYLIN_HOME"/bin/get-properties.sh
kylin.env.zookeeper.enabled)
-if [[ -z $zk_connect_string ]]; then
- quit "Failed: Zookeeper connect string is empty, please set
'kylin.env.zookeeper-connect-string' in {KYLIN_HOME}/conf/kylin.properties"
+if [[ $KYLIN_ZOOKEEPER_ENABLED == "true" ]]; then
+ echo "Checking Zookeeper role..."
+ zk_connect_string=$("${KYLIN_HOME}"/bin/get-properties.sh
kylin.env.zookeeper-connect-string)
+ if [[ -z $zk_connect_string ]]; then
+ quit "Failed: Zookeeper connect string is empty, please set
'kylin.env.zookeeper-connect-string' in {KYLIN_HOME}/conf/kylin.properties"
+ fi
+else
+ echo "KYLIN_ZOOKEEPER_ENABLED is ${KYLIN_ZOOKEEPER_ENABLED}. Skip check."
+ exit 3
fi
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index ae408e0565..c4f124c451 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -670,9 +670,12 @@ public abstract class KylinConfigBase implements
Serializable {
}
public DistributedLockFactory getDistributedLockFactory() {
- String clsName = getOptional("kylin.metadata.distributed-lock-impl",
+ return (DistributedLockFactory)
ClassUtil.newInstance(getDistributedLockFactoryFullClassName());
+ }
+
+ public String getDistributedLockFactoryFullClassName() {
+ return getOptional("kylin.metadata.distributed-lock-impl",
"org.apache.kylin.common.lock.curator.CuratorDistributedLockFactory");
- return (DistributedLockFactory) ClassUtil.newInstance(clsName);
}
public StorageURL getJDBCDistributedLockURL() {
@@ -2497,7 +2500,7 @@ public abstract class KylinConfigBase implements
Serializable {
public String getMetadataStoreType() {
if (!isJobNode()) {
- return this.getOptional("kylin.server.store-type", "hdfs");
+ return this.getOptional("kylin.server.store-type", "jdbc");
} else {
return "jdbc";
}
@@ -4563,4 +4566,28 @@ public abstract class KylinConfigBase implements
Serializable {
public boolean isImprovedSumDecimalPrecisionEnabled() {
return
Boolean.parseBoolean(getOptional("kylin.query.improved-sum-decimal-precision.enabled",
FALSE));
}
+
+ public String getServerNodeRegistryJdbcRenewCron() {
+ return getOptional("kylin.server.node-registry.jdbc.renew-cron", "0/30
* * * * *");
+ }
+
+ public String getServerNodeRegistryJdbcCheckCron() {
+ return getOptional("kylin.server.node-registry.jdbc.check-cron", "0
*/2 * * * *");
+ }
+
+ public long getNodeRegistryJdbcExpireThreshold() {
+ return
TimeUtil.timeStringAs(getOptional("kylin.server.node-registry.jdbc.expire-threshold",
"90s"),
+ TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Sleep 8s for preventing duplicated operation on other servers.
+ * For jdbc distributed lock implementation, the total locking duration
+ * should be smaller than {@link
org.springframework.integration.jdbc.lock.DefaultLockRepository}'s ttl
+ * @return check idle time
+ */
+ public long getServerNodeRegistryJdbcCheckIdleTime() {
+ return
TimeUtil.timeStringAs(getOptional("kylin.server.node-registry.jdbc.check-idle-time",
"8s"),
+ TimeUnit.MILLISECONDS);
+ }
}
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/util/ClusterConstant.java
b/src/core-common/src/main/java/org/apache/kylin/common/util/ClusterConstant.java
index b28d4d7be8..4e6f66473f 100644
---
a/src/core-common/src/main/java/org/apache/kylin/common/util/ClusterConstant.java
+++
b/src/core-common/src/main/java/org/apache/kylin/common/util/ClusterConstant.java
@@ -48,5 +48,13 @@ public class ClusterConstant implements Serializable {
this.name = name;
}
+ public static ServerModeEnum of(String name) {
+ for (ServerModeEnum e : values()) {
+ if (e.name.equalsIgnoreCase(name)) {
+ return e;
+ }
+ }
+ return null;
+ }
}
}
diff --git a/src/core-common/src/main/resources/kylin-defaults0.properties
b/src/core-common/src/main/resources/kylin-defaults0.properties
index 9a367db672..bd4f910e79 100644
--- a/src/core-common/src/main/resources/kylin-defaults0.properties
+++ b/src/core-common/src/main/resources/kylin-defaults0.properties
@@ -30,6 +30,7 @@ kylin.env.hdfs-working-dir=/kylin
# The build engine writes data to this file system, which is the same as
defaultFS in core-site-xml by default
kylin.env.engine-write-fs=
# zookeeper is used for distributed locked, service discovery, leader
selection, etc.
+kylin.env.zookeeper.enabled=true
# example:
kylin.env.zookeeper-connect-string=10.1.2.1:2181,10.1.2.2:2181,10.1.2.3:2181
kylin.env.zookeeper-connect-string=
# save for InfluxDB
@@ -59,6 +60,9 @@ kylin.server.https.keystore-password=changeit
# Set true to enable CORS for any origins
kylin.server.cors.allow-all=false
+# Kylin service discovery type
+kylin.server.node-registry.type=zookeeper
+
# ==================== JOB EXECUTION ====================
### Spark conf overwrite for build engine
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/system/NodeRegistry.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/system/NodeRegistry.java
new file mode 100644
index 0000000000..c825cb1eb2
--- /dev/null
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/system/NodeRegistry.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.system;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static org.apache.kylin.common.util.ClusterConstant.ServerModeEnum;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.MetadataType;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+@Getter
+@JsonAutoDetect(getterVisibility = NONE, isGetterVisibility = NONE,
setterVisibility = NONE, fieldVisibility = NONE)
+@Slf4j
+public class NodeRegistry extends RootPersistentEntity {
+
+ public static final String NODE_REGISTRY = "node_registry";
+
+ @Getter
+ @JsonAutoDetect(getterVisibility = NONE, isGetterVisibility = NONE,
setterVisibility = NONE, fieldVisibility = NONE)
+ @EqualsAndHashCode(callSuper = false)
+ @ToString
+ public static class NodeInstance {
+
+ public NodeInstance() {
+ // Non-args constructor for jackson deserializing
+ }
+
+ public NodeInstance(String host, int port, String modeStr) {
+ this(host, port, ServerModeEnum.valueOf(modeStr));
+ }
+
+ public NodeInstance(String host, int port, ServerModeEnum mode) {
+ this.host = host;
+ this.port = port;
+ this.serverMode = mode;
+ this.lastHeartbeatTS = System.currentTimeMillis();
+ }
+
+ @JsonProperty("host")
+ private String host;
+
+ @JsonProperty("port")
+ private int port;
+
+ @JsonProperty("server_mode")
+ private ServerModeEnum serverMode;
+
+ @JsonProperty("last_heartbeat_ts")
+ @EqualsAndHashCode.Exclude
+ private long lastHeartbeatTS;
+
+ public boolean hostAndPortEquals(NodeInstance that) {
+ return Objects.equals(host, that.host) && port == that.port;
+ }
+ }
+
+ @JsonProperty("name")
+ private final String name = NODE_REGISTRY;
+
+ @JsonProperty("registry")
+ private final Map<ServerModeEnum, List<NodeInstance>> registry = new
HashMap<>();
+
+ @JsonProperty("last_cleanup_ts")
+ private long lastCleanupTS;
+
+ public List<NodeInstance> getNodeInstances(ServerModeEnum mode) {
+ return registry.getOrDefault(mode, new ArrayList<>());
+ }
+
+ public void registerOrUpdate(NodeInstance instance) {
+ Preconditions.checkNotNull(instance);
+ List<NodeInstance> nodeInstances =
registry.computeIfAbsent(instance.getServerMode(), k -> new ArrayList<>());
+ if (nodeInstances.isEmpty()) {
+ nodeInstances.add(instance);
+ } else {
+ int indexFound = findIndexByHostAndPort(nodeInstances, instance);
+ if (indexFound > -1) {
+ nodeInstances.set(indexFound, instance);
+ } else {
+ nodeInstances.add(instance);
+ }
+ }
+ }
+
+ public void deregister(NodeInstance instance) {
+ Preconditions.checkNotNull(instance);
+ List<NodeInstance> nodeInstances =
registry.get(instance.getServerMode());
+ if (nodeInstances != null && !nodeInstances.isEmpty()) {
+ int indexFound = findIndexByHostAndPort(nodeInstances, instance);
+ if (indexFound > -1) {
+ nodeInstances.remove(indexFound);
+ }
+ }
+ }
+
+ public void removeExpiredRegistrations() {
+ long now = System.currentTimeMillis();
+ long expireThreshold =
KylinConfig.getInstanceFromEnv().getNodeRegistryJdbcExpireThreshold();
+ long timeoutTS = now - expireThreshold;
+ for (Map.Entry<ServerModeEnum, List<NodeInstance>> entry :
registry.entrySet()) {
+ List<NodeInstance> nodeInstances = entry.getValue();
+ if (nodeInstances != null && !nodeInstances.isEmpty()) {
+ nodeInstances.removeIf(node -> node.getLastHeartbeatTS() <
timeoutTS);
+ }
+ }
+ lastCleanupTS = now;
+ }
+
+ public boolean isEmpty() {
+ if (registry.isEmpty()) {
+ return true;
+ }
+ for (List<NodeInstance> nodeInstances : registry.values()) {
+ if (!nodeInstances.isEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String resourceName() {
+ return NODE_REGISTRY;
+ }
+
+ @Override
+ public MetadataType resourceType() {
+ return MetadataType.SYSTEM;
+ }
+
+ private static int findIndexByHostAndPort(List<NodeInstance>
nodeInstances, NodeInstance instance) {
+ for (int i = 0; i < nodeInstances.size(); i++) {
+ NodeInstance ins = nodeInstances.get(i);
+ if (ins.hostAndPortEquals(instance)) {
+ return i;
+ }
+ }
+ return -1;
+ }
+}
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/system/NodeRegistryManager.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/system/NodeRegistryManager.java
new file mode 100644
index 0000000000..72390c8e52
--- /dev/null
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/system/NodeRegistryManager.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.system;
+
+import static org.apache.kylin.common.util.ClusterConstant.ServerModeEnum;
+import static org.apache.kylin.metadata.system.NodeRegistry.NODE_REGISTRY;
+
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.MetadataType;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.common.util.AddressUtil;
+import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class NodeRegistryManager {
+
+ public static NodeRegistryManager getInstance(KylinConfig config) {
+ return config.getManager(NodeRegistryManager.class);
+ }
+
+ // called by reflection
+ static NodeRegistryManager newInstance(KylinConfig config) {
+ return new NodeRegistryManager(config);
+ }
+
+ private final KylinConfig config;
+ private final CachedCrudAssist<NodeRegistry> crud;
+ private final String host;
+ private final int port;
+ private final ServerModeEnum serverMode;
+
+ public NodeRegistryManager(KylinConfig config) {
+ if (!UnitOfWork.isAlreadyInTransaction()) {
+ log.info("Initializing DiscoveryManager with KylinConfig Id: {}",
System.identityHashCode(config));
+ }
+ this.config = config;
+ crud = new CachedCrudAssist<NodeRegistry>(getStore(),
MetadataType.SYSTEM, null, NodeRegistry.class) {
+ @Override
+ protected NodeRegistry initEntityAfterReload(NodeRegistry entity,
String projectName) {
+ return entity;
+ }
+ };
+ host = AddressUtil.getLocalHostExactAddress();
+ port = Integer.parseInt(config.getServerPort());
+ serverMode = ServerModeEnum.of(config.getServerMode());
+ }
+
+ public NodeRegistry getNodeRegistry() {
+ return crud.get(NODE_REGISTRY);
+ }
+
+ public List<NodeRegistry.NodeInstance> getNodeInstances(ServerModeEnum
mode) {
+ return getNodeRegistry().getNodeInstances(mode);
+ }
+
+ public NodeRegistry.NodeInstance getLocalNodeInstance() {
+ return new NodeRegistry.NodeInstance(host, port, serverMode);
+ }
+
+ public void createNodeRegistryIfNotExists() {
+ NodeRegistry nodeRegistry = getNodeRegistry();
+ if (nodeRegistry != null) {
+ renew();
+ } else {
+ NodeRegistry newNodeRegistry = copyForWrite(new NodeRegistry());
+ newNodeRegistry.registerOrUpdate(newNodeInstance());
+ crud.save(newNodeRegistry);
+ }
+ }
+
+ public void renew() {
+ NodeRegistry nodeRegistry = copyForWrite(getNodeRegistry());
+ nodeRegistry.registerOrUpdate(newNodeInstance());
+ crud.save(nodeRegistry);
+ }
+
+ public void checkAndClean() {
+ NodeRegistry nodeRegistry = getNodeRegistry();
+ if (nodeRegistry == null) {
+ return;
+ }
+ nodeRegistry = copyForWrite(nodeRegistry);
+ nodeRegistry.removeExpiredRegistrations();
+ crud.save(nodeRegistry);
+ }
+
+ public void cleanup() {
+ NodeRegistry nodeRegistry = getNodeRegistry();
+ if (nodeRegistry == null) {
+ return;
+ }
+ nodeRegistry = copyForWrite(nodeRegistry);
+ nodeRegistry.deregister(newNodeInstance());
+ if (nodeRegistry.isEmpty()) {
+ crud.delete(nodeRegistry);
+ } else {
+ crud.save(nodeRegistry);
+ }
+ }
+
+ public NodeRegistry copyForWrite(NodeRegistry nodeRegistry) {
+ Preconditions.checkNotNull(nodeRegistry);
+ return crud.copyForWrite(nodeRegistry);
+ }
+
+ ResourceStore getStore() {
+ return ResourceStore.getKylinMetaStore(config);
+ }
+
+ private NodeRegistry.NodeInstance newNodeInstance() {
+ return new NodeRegistry.NodeInstance(host, port, serverMode);
+ }
+}
diff --git
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/system/NodeRegistryManagerTest.java
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/system/NodeRegistryManagerTest.java
new file mode 100644
index 0000000000..86830a8fbf
--- /dev/null
+++
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/system/NodeRegistryManagerTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.system;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.ClusterConstant;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NodeRegistryManagerTest extends NLocalFileMetadataTestCase {
+
+ @Before
+ public void setUp() throws Exception {
+ createTestMetadata();
+ overwriteSystemProp("kylin.server.mode", "all");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ cleanupTestMetadata();
+ }
+
+ @Test
+ public void testCreateNodeRegistry() throws Exception {
+ NodeRegistryManager manager =
NodeRegistryManager.getInstance(getTestConfig());
+ manager.createNodeRegistryIfNotExists();
+
+ List<NodeRegistry.NodeInstance> nodeInstances =
manager.getNodeRegistry()
+ .getNodeInstances(ClusterConstant.ServerModeEnum.ALL);
+ assertEquals(1, nodeInstances.size());
+ NodeRegistry.NodeInstance instance = nodeInstances.get(0);
+
+ // test renew condition on registry existing
+ manager.createNodeRegistryIfNotExists();
+ List<NodeRegistry.NodeInstance> newNodeInstances =
manager.getNodeRegistry()
+ .getNodeInstances(ClusterConstant.ServerModeEnum.ALL);
+ assertEquals(1, newNodeInstances.size());
+ NodeRegistry.NodeInstance newInstance = newNodeInstances.get(0);
+
+ assertTrue(newInstance.getLastHeartbeatTS() >
instance.getLastHeartbeatTS());
+ }
+}
diff --git
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/system/NodeRegistryTest.java
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/system/NodeRegistryTest.java
new file mode 100644
index 0000000000..343f5a87d3
--- /dev/null
+++
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/system/NodeRegistryTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.system;
+
+import static org.apache.kylin.common.util.ClusterConstant.ServerModeEnum;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.test.util.ReflectionTestUtils;
+
+public class NodeRegistryTest extends NLocalFileMetadataTestCase {
+
+ private NodeRegistry nodeRegistry;
+ private NodeRegistry.NodeInstance instanceAll1;
+ private NodeRegistry.NodeInstance instanceAll2;
+ private NodeRegistry.NodeInstance instanceJob1;
+
+ @Before
+ public void setup() throws Exception {
+ nodeRegistry = new NodeRegistry();
+ instanceAll1 = new NodeRegistry.NodeInstance("host1", 7070,
ServerModeEnum.ALL);
+ instanceAll2 = new NodeRegistry.NodeInstance("host1", 7070,
ServerModeEnum.ALL);
+ instanceJob1 = new NodeRegistry.NodeInstance("host2", 8080,
ServerModeEnum.JOB);
+
+ // Mock KylinConfig to control expiration threshold
+ createTestMetadata();
+
overwriteSystemProp("kylin.server.node-registry.jdbc.expire-threshold", "1s");
+ }
+
+ @After
+ public void tearDown() {
+ cleanupTestMetadata();
+ }
+
+ @Test
+ public void testRegisterOrUpdate_AddNewNode() {
+ nodeRegistry.registerOrUpdate(instanceAll1);
+ List<NodeRegistry.NodeInstance> instances =
nodeRegistry.getNodeInstances(ServerModeEnum.ALL);
+ assertEquals(1, instances.size());
+ assertEquals(instanceAll1, instances.get(0));
+ }
+
+ @Test
+ public void testRegisterOrUpdate_UpdateExistingNode() throws
InterruptedException {
+ nodeRegistry.registerOrUpdate(instanceAll1);
+ // Simulate update with new heartbeat time
+ NodeRegistry.NodeInstance updatedInstance = new
NodeRegistry.NodeInstance("host1", 7070, ServerModeEnum.ALL);
+ nodeRegistry.registerOrUpdate(updatedInstance);
+
+ List<NodeRegistry.NodeInstance> instances =
nodeRegistry.getNodeInstances(ServerModeEnum.ALL);
+ assertEquals(1, instances.size());
+ assertNotEquals(instanceAll1.getLastHeartbeatTS(),
instances.get(0).getLastHeartbeatTS());
+ }
+
+ @Test
+ public void testDeregister_ExistingNode() {
+ nodeRegistry.registerOrUpdate(instanceAll1);
+ nodeRegistry.deregister(instanceAll1);
+
assertTrue(nodeRegistry.getNodeInstances(ServerModeEnum.ALL).isEmpty());
+ }
+
+ @Test
+ public void testDeregister_NonExistingNode() {
+ nodeRegistry.registerOrUpdate(instanceAll1);
+ nodeRegistry.deregister(instanceJob1); // Different mode
+ assertEquals(1,
nodeRegistry.getNodeInstances(ServerModeEnum.ALL).size());
+ }
+
+ @Test
+ public void testRemoveExpiredRegistrations() throws Exception {
+ // Create expired instance (older than threshold)
+ NodeRegistry.NodeInstance expiredInstance = new
NodeRegistry.NodeInstance("host3", 9090, ServerModeEnum.QUERY);
+ ReflectionTestUtils.setField(expiredInstance, "lastHeartbeatTS",
System.currentTimeMillis() - 2000);
+
+ // Create valid instance
+ NodeRegistry.NodeInstance validInstance = new
NodeRegistry.NodeInstance("host4", 9091, ServerModeEnum.QUERY);
+ ReflectionTestUtils.setField(validInstance, "lastHeartbeatTS",
System.currentTimeMillis() - 500);
+
+ nodeRegistry.registerOrUpdate(expiredInstance);
+ nodeRegistry.registerOrUpdate(validInstance);
+
+ nodeRegistry.removeExpiredRegistrations();
+
+ List<NodeRegistry.NodeInstance> instances =
nodeRegistry.getNodeInstances(ServerModeEnum.QUERY);
+ assertEquals(1, instances.size());
+ assertEquals(validInstance, instances.get(0));
+ }
+
+ @Test
+ public void testIsEmpty() {
+ assertTrue(nodeRegistry.isEmpty());
+ nodeRegistry.registerOrUpdate(instanceAll1);
+ assertFalse(nodeRegistry.isEmpty());
+ nodeRegistry.deregister(instanceAll1);
+ assertTrue(nodeRegistry.isEmpty());
+ }
+
+ @Test
+ public void testNodeInstanceEqualsAndHashCode() {
+ assertEquals(instanceAll1, instanceAll2);
+ assertEquals(instanceAll1.hashCode(), instanceAll2.hashCode());
+
+ NodeRegistry.NodeInstance differentPort = new
NodeRegistry.NodeInstance("host1", 7071, ServerModeEnum.ALL);
+ assertNotEquals(instanceAll1, differentPort);
+ }
+
+ @Test
+ public void testHostAndPortEquals() {
+ NodeRegistry.NodeInstance differentMode = new
NodeRegistry.NodeInstance("host1", 7070, ServerModeEnum.JOB);
+ assertTrue(instanceAll1.hostAndPortEquals(differentMode));
+ }
+
+ @Test
+ public void testGetNodeInstancesByMode() {
+ nodeRegistry.registerOrUpdate(instanceAll1);
+ nodeRegistry.registerOrUpdate(instanceJob1);
+
+ List<NodeRegistry.NodeInstance> allInstances =
nodeRegistry.getNodeInstances(ServerModeEnum.ALL);
+ assertEquals(1, allInstances.size());
+
+ List<NodeRegistry.NodeInstance> jobInstances =
nodeRegistry.getNodeInstances(ServerModeEnum.JOB);
+ assertEquals(1, jobInstances.size());
+ }
+}
\ No newline at end of file
diff --git
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperClusterManager.java
b/src/server/src/main/java/org/apache/kylin/rest/ZookeeperClusterManager.java
index 50761c7b2b..66a56d93c6 100644
---
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperClusterManager.java
+++
b/src/server/src/main/java/org/apache/kylin/rest/ZookeeperClusterManager.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.kylin.rest.cluster.ClusterManager;
+import
org.apache.kylin.rest.discovery.ConditionalOnNodeRegistryZookeeperEnabled;
import org.apache.kylin.rest.discovery.KylinServiceDiscoveryCache;
import org.apache.kylin.rest.discovery.KylinServiceDiscoveryClient;
import org.apache.kylin.rest.response.ServerInfoResponse;
@@ -37,6 +38,7 @@ import lombok.val;
import lombok.extern.slf4j.Slf4j;
@ConditionalOnZookeeperEnabled // if missing, a default impl will be filled by
AppConfig.clusterManager()
+@ConditionalOnNodeRegistryZookeeperEnabled
@Component
@Slf4j
public class ZookeeperClusterManager implements ClusterManager {
diff --git
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java
b/src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java
index 658481e8ae..e23646288c 100644
---
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java
+++
b/src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java
@@ -18,6 +18,7 @@
package org.apache.kylin.rest;
import org.apache.kylin.common.util.HostInfoFetcher;
+import
org.apache.kylin.rest.discovery.ConditionalOnNodeRegistryZookeeperEnabled;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.commons.util.InetUtils;
import org.springframework.cloud.zookeeper.ConditionalOnZookeeperEnabled;
@@ -26,6 +27,7 @@ import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@ConditionalOnZookeeperEnabled
+@ConditionalOnNodeRegistryZookeeperEnabled
@Component
@Slf4j
public class ZookeeperHostInfoFetcher implements HostInfoFetcher {
diff --git
a/src/server/src/main/java/org/apache/kylin/rest/config/ZookeeperConfig.java
b/src/server/src/main/java/org/apache/kylin/rest/config/ZookeeperConfig.java
index f93632b8f1..40cac84bcc 100644
--- a/src/server/src/main/java/org/apache/kylin/rest/config/ZookeeperConfig.java
+++ b/src/server/src/main/java/org/apache/kylin/rest/config/ZookeeperConfig.java
@@ -20,6 +20,7 @@ package org.apache.kylin.rest.config;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.kylin.common.util.ZookeeperAclBuilder;
+import
org.apache.kylin.rest.discovery.ConditionalOnNodeRegistryZookeeperEnabled;
import org.springframework.cloud.zookeeper.ConditionalOnZookeeperEnabled;
import org.springframework.cloud.zookeeper.CuratorFrameworkCustomizer;
import org.springframework.context.annotation.Configuration;
@@ -29,6 +30,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@Configuration(proxyBeanMethods = false)
@ConditionalOnZookeeperEnabled
+@ConditionalOnNodeRegistryZookeeperEnabled
public class ZookeeperConfig implements CuratorFrameworkCustomizer {
@Override
diff --git
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java
b/src/server/src/main/java/org/apache/kylin/rest/discovery/ConditionalOnNodeRegistryJdbcEnabled.java
similarity index 55%
copy from
src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java
copy to
src/server/src/main/java/org/apache/kylin/rest/discovery/ConditionalOnNodeRegistryJdbcEnabled.java
index 658481e8ae..8f2821e7c4 100644
---
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java
+++
b/src/server/src/main/java/org/apache/kylin/rest/discovery/ConditionalOnNodeRegistryJdbcEnabled.java
@@ -15,26 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kylin.rest;
-import org.apache.kylin.common.util.HostInfoFetcher;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cloud.commons.util.InetUtils;
-import org.springframework.cloud.zookeeper.ConditionalOnZookeeperEnabled;
-import org.springframework.stereotype.Component;
+package org.apache.kylin.rest.discovery;
-import lombok.extern.slf4j.Slf4j;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
-@ConditionalOnZookeeperEnabled
-@Component
-@Slf4j
-public class ZookeeperHostInfoFetcher implements HostInfoFetcher {
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
- @Autowired
- InetUtils inetUtils;
-
- @Override
- public String getHostname() {
- return inetUtils.findFirstNonLoopbackHostInfo().getHostname();
- }
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ ElementType.TYPE })
+@ConditionalOnProperty(name = "kylin.server.node-registry.type", havingValue =
"jdbc")
+public @interface ConditionalOnNodeRegistryJdbcEnabled {
}
diff --git
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java
b/src/server/src/main/java/org/apache/kylin/rest/discovery/ConditionalOnNodeRegistryZookeeperEnabled.java
similarity index 55%
copy from
src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java
copy to
src/server/src/main/java/org/apache/kylin/rest/discovery/ConditionalOnNodeRegistryZookeeperEnabled.java
index 658481e8ae..c9d884062a 100644
---
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java
+++
b/src/server/src/main/java/org/apache/kylin/rest/discovery/ConditionalOnNodeRegistryZookeeperEnabled.java
@@ -15,26 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kylin.rest;
-import org.apache.kylin.common.util.HostInfoFetcher;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cloud.commons.util.InetUtils;
-import org.springframework.cloud.zookeeper.ConditionalOnZookeeperEnabled;
-import org.springframework.stereotype.Component;
+package org.apache.kylin.rest.discovery;
-import lombok.extern.slf4j.Slf4j;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
-@ConditionalOnZookeeperEnabled
-@Component
-@Slf4j
-public class ZookeeperHostInfoFetcher implements HostInfoFetcher {
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
- @Autowired
- InetUtils inetUtils;
-
- @Override
- public String getHostname() {
- return inetUtils.findFirstNonLoopbackHostInfo().getHostname();
- }
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ ElementType.TYPE })
+@ConditionalOnProperty(name = "kylin.server.node-registry.type", havingValue =
"zookeeper")
+public @interface ConditionalOnNodeRegistryZookeeperEnabled {
}
diff --git
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperClusterManager.java
b/src/server/src/main/java/org/apache/kylin/rest/discovery/JdbcClusterManager.java
similarity index 57%
copy from
src/server/src/main/java/org/apache/kylin/rest/ZookeeperClusterManager.java
copy to
src/server/src/main/java/org/apache/kylin/rest/discovery/JdbcClusterManager.java
index 50761c7b2b..0be7595e29 100644
---
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperClusterManager.java
+++
b/src/server/src/main/java/org/apache/kylin/rest/discovery/JdbcClusterManager.java
@@ -15,44 +15,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kylin.rest;
+
+package org.apache.kylin.rest.discovery;
import static org.apache.kylin.common.util.ClusterConstant.ServerModeEnum;
import java.util.ArrayList;
import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
-import org.apache.commons.lang3.ArrayUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClusterConstant;
+import org.apache.kylin.metadata.system.NodeRegistry;
+import org.apache.kylin.metadata.system.NodeRegistryManager;
import org.apache.kylin.rest.cluster.ClusterManager;
-import org.apache.kylin.rest.discovery.KylinServiceDiscoveryCache;
-import org.apache.kylin.rest.discovery.KylinServiceDiscoveryClient;
import org.apache.kylin.rest.response.ServerInfoResponse;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cloud.zookeeper.ConditionalOnZookeeperEnabled;
import org.springframework.stereotype.Component;
-import lombok.val;
import lombok.extern.slf4j.Slf4j;
-@ConditionalOnZookeeperEnabled // if missing, a default impl will be filled by
AppConfig.clusterManager()
+@ConditionalOnNodeRegistryJdbcEnabled
@Component
@Slf4j
-public class ZookeeperClusterManager implements ClusterManager {
-
- @Autowired
- private KylinServiceDiscoveryCache serviceCache;
-
- @Autowired
- private KylinServiceDiscoveryClient discoveryClient;
-
- public ZookeeperClusterManager() {
- }
+public class JdbcClusterManager implements ClusterManager {
@Override
public String getLocalServer() {
- return discoveryClient.getLocalServiceServer();
+ NodeRegistryManager manager =
NodeRegistryManager.getInstance(KylinConfig.getInstanceFromEnv());
+ NodeRegistry.NodeInstance localInstance =
manager.getLocalNodeInstance();
+ return instance2Str(localInstance);
}
@Override
@@ -75,17 +69,22 @@ public class ZookeeperClusterManager implements
ClusterManager {
return getServerByMode(ServerModeEnum.ALL, ServerModeEnum.JOB,
ServerModeEnum.QUERY);
}
- private List<ServerInfoResponse> getServerByMode(@Nullable
ServerModeEnum... serverModeEnum) {
+ private static List<ServerInfoResponse> getServerByMode(
+ @Nullable ClusterConstant.ServerModeEnum... serverModeEnum) {
List<ServerInfoResponse> servers = new ArrayList<>();
-
- if (ArrayUtils.isEmpty(serverModeEnum)) {
+ if (serverModeEnum == null || serverModeEnum.length == 0) {
return servers;
}
-
- for (val nodeModeType : serverModeEnum) {
-
servers.addAll(serviceCache.getServerInfoByServerMode(nodeModeType));
+ NodeRegistryManager manager =
NodeRegistryManager.getInstance(KylinConfig.getInstanceFromEnv());
+ for (ClusterConstant.ServerModeEnum mode : serverModeEnum) {
+ servers.addAll(manager.getNodeInstances(mode).stream()
+ .map(instance -> new
ServerInfoResponse(instance2Str(instance), instance.getServerMode().getName()))
+ .collect(Collectors.toList()));
}
return servers;
}
+ private static String instance2Str(NodeRegistry.NodeInstance instance) {
+ return String.format(Locale.ROOT, "%s:%s", instance.getHost(),
instance.getPort());
+ }
}
diff --git
a/src/server/src/main/java/org/apache/kylin/rest/discovery/KylinServiceDiscoveryCache.java
b/src/server/src/main/java/org/apache/kylin/rest/discovery/KylinServiceDiscoveryCache.java
index e1b3ac24fb..43f97c1c90 100644
---
a/src/server/src/main/java/org/apache/kylin/rest/discovery/KylinServiceDiscoveryCache.java
+++
b/src/server/src/main/java/org/apache/kylin/rest/discovery/KylinServiceDiscoveryCache.java
@@ -61,6 +61,7 @@ import org.springframework.stereotype.Component;
import lombok.val;
@ConditionalOnZookeeperEnabled
+@ConditionalOnNodeRegistryZookeeperEnabled
@Component
public class KylinServiceDiscoveryCache implements KylinServiceDiscovery {
private static final Logger logger =
LoggerFactory.getLogger(KylinServiceDiscoveryCache.class);
diff --git
a/src/server/src/main/java/org/apache/kylin/rest/discovery/KylinServiceDiscoveryClient.java
b/src/server/src/main/java/org/apache/kylin/rest/discovery/KylinServiceDiscoveryClient.java
index 6422566ec9..a8a5075fe8 100644
---
a/src/server/src/main/java/org/apache/kylin/rest/discovery/KylinServiceDiscoveryClient.java
+++
b/src/server/src/main/java/org/apache/kylin/rest/discovery/KylinServiceDiscoveryClient.java
@@ -40,6 +40,7 @@ import lombok.val;
import lombok.extern.slf4j.Slf4j;
@ConditionalOnZookeeperEnabled
+@ConditionalOnNodeRegistryZookeeperEnabled
@Component
@Slf4j
public class KylinServiceDiscoveryClient implements KylinServiceDiscovery {
diff --git
a/src/server/src/main/java/org/apache/kylin/rest/discovery/NodeRegistryService.java
b/src/server/src/main/java/org/apache/kylin/rest/discovery/NodeRegistryService.java
new file mode 100644
index 0000000000..66b4a504a4
--- /dev/null
+++
b/src/server/src/main/java/org/apache/kylin/rest/discovery/NodeRegistryService.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.discovery;
+
+import java.util.TimeZone;
+import java.util.concurrent.locks.Lock;
+
+import javax.annotation.PreDestroy;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.ibatis.exceptions.PersistenceException;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.transaction.TransactionException;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.apache.kylin.metadata.system.NodeRegistryManager;
+import org.apache.kylin.rest.aspect.Transaction;
+import org.apache.kylin.rest.util.SpringContext;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+import org.springframework.scheduling.support.CronTrigger;
+import org.springframework.stereotype.Service;
+
+import lombok.extern.slf4j.Slf4j;
+
+@ConditionalOnNodeRegistryJdbcEnabled
+@Service
+@Slf4j
+public class NodeRegistryService {
+
+ private ThreadPoolTaskScheduler scheduler;
+
+ @EventListener(ApplicationReadyEvent.class)
+ public void initOnApplicationReady(ApplicationReadyEvent event) {
+ log.info("NodeRegistryService initOnApplicationReady");
+
SpringContext.getBean(NodeRegistryService.class).tryCreateNodeRegistry();
+
+ // Since the spring scheduler annotation is skipped on query mode
according to SchedulerEnhancer,
+ // an independent task scheduler is used.
+ // See org.apache.kylin.rest.aspect.SchedulerEnhancer for more details.
+ scheduler = new ThreadPoolTaskScheduler();
+ scheduler.setPoolSize(2);
+ scheduler.setThreadNamePrefix("NodeRegistryScheduler-");
+ scheduler.initialize();
+
SpringContext.getBean(NodeRegistryService.class).manuallyScheduleTasks();
+ }
+
+ public void manuallyScheduleTasks() {
+ if (scheduler == null) {
+ throw new IllegalStateException("scheduler not initialized");
+ }
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ String renewCron = config.getServerNodeRegistryJdbcRenewCron();
+ String checkCron = config.getServerNodeRegistryJdbcCheckCron();
+ scheduler.schedule(() ->
SpringContext.getBean(NodeRegistryService.class).scheduleRenew(),
+ new CronTrigger(renewCron, TimeZone.getDefault()));
+ scheduler.schedule(() ->
SpringContext.getBean(NodeRegistryService.class).scheduleCheck(),
+ new CronTrigger(checkCron, TimeZone.getDefault()));
+ }
+
+ @PreDestroy
+ public void preDestroy() {
+ log.info("NodeRegistryService preDestroy");
+
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(UnitOfWorkParams.builder().unitName(UnitOfWork.GLOBAL_UNIT)
+
.projectId(StringUtils.EMPTY).readonly(false).maxRetry(1).processor(() -> {
+ try {
+ NodeRegistryManager manager =
NodeRegistryManager.getInstance(KylinConfig.getInstanceFromEnv());
+ manager.cleanup();
+ return null;
+ } catch (Throwable throwable) {
+ throw new TransactionException(throwable);
+ }
+ }).build());
+ }
+
+ public void scheduleRenew() {
+ log.debug("Start renew registry");
+ SpringContext.getBean(NodeRegistryService.class).renew();
+ log.debug("End renew registry");
+ }
+
+ public void scheduleCheck() {
+ Lock lock =
KylinConfig.getInstanceFromEnv().getDistributedLockFactory()
+ .lockForCurrentProcess("node-registry-check-lock");
+ if (lock.tryLock()) {
+ try {
+ log.debug("Start check registry");
+ SpringContext.getBean(NodeRegistryService.class).check();
+
sleep(KylinConfig.getInstanceFromEnv().getServerNodeRegistryJdbcCheckIdleTime());
+ } catch (Exception e) {
+ log.warn("Check registry failed", e);
+ } finally {
+ lock.unlock();
+ log.debug("End check registry");
+ }
+ }
+ }
+
+ @Transaction
+ public void tryCreateNodeRegistry() {
+ NodeRegistryManager manager =
NodeRegistryManager.getInstance(KylinConfig.getInstanceFromEnv());
+ try {
+ manager.createNodeRegistryIfNotExists();
+ } catch (PersistenceException e) {
+ log.warn("SYSTEM/node_registry is most likely existing");
+ }
+ }
+
+ @Transaction
+ public void renew() {
+ NodeRegistryManager manager =
NodeRegistryManager.getInstance(KylinConfig.getInstanceFromEnv());
+ manager.renew();
+ }
+
+ @Transaction(retry = 1)
+ public void check() {
+ NodeRegistryManager manager =
NodeRegistryManager.getInstance(KylinConfig.getInstanceFromEnv());
+ manager.checkAndClean();
+ }
+
+ private static void sleep(long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ log.warn("Thread sleeping was interrupted");
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git a/src/server/src/main/resources/application.yaml
b/src/server/src/main/resources/application.yaml
index c4d6f36ed4..300f755173 100644
--- a/src/server/src/main/resources/application.yaml
+++ b/src/server/src/main/resources/application.yaml
@@ -89,6 +89,7 @@ spring:
composite-indicator:
enabled: false
zookeeper:
+ enabled: ${kylin.env.zookeeper.enabled}
connect-string: ${kylin.env.zookeeper-connect-string}
discovery:
root: /kylin/${kylin.metadata.url.unique-id}/services
diff --git
a/src/server/src/test/java/org/apache/kylin/rest/config/ZookeeperConfigTest.java
b/src/server/src/test/java/org/apache/kylin/rest/config/ZookeeperConfigTest.java
index 008d0de076..d0f1f93f7a 100644
---
a/src/server/src/test/java/org/apache/kylin/rest/config/ZookeeperConfigTest.java
+++
b/src/server/src/test/java/org/apache/kylin/rest/config/ZookeeperConfigTest.java
@@ -46,6 +46,7 @@ class ZookeeperConfigTest {
@Test
@OverwriteProp.OverwriteProps({ //
+ @OverwriteProp(key = "kylin.server.node-registry.type", value =
"zookeeper"),
@OverwriteProp(key = "kylin.env.zookeeper-acl-enabled", value =
"true"),
@OverwriteProp(key = "kylin.env.zookeeper.zk-auth", value =
"digest:ADMIN:KYLIN"),
@OverwriteProp(key = "kylin.env.zookeeper.zk-acl", value =
"world:anyone:rwcda") })
diff --git
a/src/server/src/test/java/org/apache/kylin/rest/discovery/JdbcClusterManagerTest.java
b/src/server/src/test/java/org/apache/kylin/rest/discovery/JdbcClusterManagerTest.java
new file mode 100644
index 0000000000..4573c2f1c1
--- /dev/null
+++
b/src/server/src/test/java/org/apache/kylin/rest/discovery/JdbcClusterManagerTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.discovery;
+
+import static org.apache.kylin.common.util.ClusterConstant.ServerModeEnum;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.kylin.common.util.ClusterConstant;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
+import org.apache.kylin.metadata.system.NodeRegistry;
+import org.apache.kylin.metadata.system.NodeRegistryManager;
+import org.apache.kylin.rest.response.ServerInfoResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.test.util.ReflectionTestUtils;
+
+public class JdbcClusterManagerTest extends NLocalFileMetadataTestCase {
+
+ private JdbcClusterManager jdbcClusterManager;
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setUp() throws Exception {
+ createTestMetadata();
+ overwriteSystemProp("kylin.server.mode", "all");
+
+ NodeRegistryManager manager =
NodeRegistryManager.getInstance(getTestConfig());
+ manager.createNodeRegistryIfNotExists();
+
+ CachedCrudAssist<NodeRegistry> crud = (CachedCrudAssist<NodeRegistry>)
ReflectionTestUtils.getField(manager,
+ "crud");
+ assertNotNull(crud);
+
+ {
+ NodeRegistry nodeRegistry =
crud.copyForWrite(manager.getNodeRegistry());
+ NodeRegistry.NodeInstance instance = new
NodeRegistry.NodeInstance("123.123.123.101", 7070,
+ ClusterConstant.ServerModeEnum.QUERY);
+ nodeRegistry.registerOrUpdate(instance);
+ crud.save(nodeRegistry);
+ }
+ {
+ NodeRegistry nodeRegistry =
crud.copyForWrite(manager.getNodeRegistry());
+ NodeRegistry.NodeInstance instance = new
NodeRegistry.NodeInstance("123.123.123.102", 7070,
+ ClusterConstant.ServerModeEnum.JOB);
+ nodeRegistry.registerOrUpdate(instance);
+ crud.save(nodeRegistry);
+ }
+
+ assertEquals(3,
manager.getNodeRegistry().getRegistry().values().size());
+ assertEquals(1,
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.ALL).size());
+ assertEquals(1,
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.QUERY).size());
+ assertEquals(1,
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.JOB).size());
+
+ jdbcClusterManager = new JdbcClusterManager();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ cleanupTestMetadata();
+ }
+
+ @Test
+ public void testGetLocalServer() {
+ NodeRegistryManager manager =
NodeRegistryManager.getInstance(getTestConfig());
+ NodeRegistry.NodeInstance instance =
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.ALL).get(0);
+ assertEquals(toIpPortStr(instance.getHost(), instance.getPort()),
jdbcClusterManager.getLocalServer());
+ }
+
+ @Test
+ public void testGetQueryServers() {
+ NodeRegistryManager manager =
NodeRegistryManager.getInstance(getTestConfig());
+ NodeRegistry.NodeInstance all =
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.ALL).get(0);
+ NodeRegistry.NodeInstance query =
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.QUERY).get(0);
+
+ List<ServerInfoResponse> servers =
jdbcClusterManager.getQueryServers();
+ assertEquals(2, servers.size());
+
+ assertEquals(toIpPortStr(all.getHost(), all.getPort()),
servers.get(0).getHost());
+ assertEquals(all.getServerMode().getName(), servers.get(0).getMode());
+ assertEquals(toIpPortStr(query.getHost(), query.getPort()),
servers.get(1).getHost());
+ assertEquals(query.getServerMode().getName(),
servers.get(1).getMode());
+ }
+
+ @Test
+ public void testGetJobServers() {
+ NodeRegistryManager manager =
NodeRegistryManager.getInstance(getTestConfig());
+ NodeRegistry.NodeInstance all =
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.ALL).get(0);
+ NodeRegistry.NodeInstance job =
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.JOB).get(0);
+
+ List<ServerInfoResponse> servers = jdbcClusterManager.getJobServers();
+ assertEquals(2, servers.size());
+
+ assertEquals(toIpPortStr(all.getHost(), all.getPort()),
servers.get(0).getHost());
+ assertEquals(all.getServerMode().getName(), servers.get(0).getMode());
+ assertEquals(toIpPortStr(job.getHost(), job.getPort()),
servers.get(1).getHost());
+ assertEquals(job.getServerMode().getName(), servers.get(1).getMode());
+ }
+
+ @Test
+ public void testGetServers() {
+ NodeRegistryManager manager =
NodeRegistryManager.getInstance(getTestConfig());
+ NodeRegistry.NodeInstance all =
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.ALL).get(0);
+ NodeRegistry.NodeInstance job =
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.JOB).get(0);
+ NodeRegistry.NodeInstance query =
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.QUERY).get(0);
+
+ List<ServerInfoResponse> servers =
jdbcClusterManager.getServersFromCache();
+ assertEquals(3, servers.size());
+
+ assertEquals(toIpPortStr(all.getHost(), all.getPort()),
servers.get(0).getHost());
+ assertEquals(all.getServerMode().getName(), servers.get(0).getMode());
+ assertEquals(toIpPortStr(job.getHost(), job.getPort()),
servers.get(1).getHost());
+ assertEquals(job.getServerMode().getName(), servers.get(1).getMode());
+ assertEquals(toIpPortStr(query.getHost(), query.getPort()),
servers.get(2).getHost());
+ assertEquals(query.getServerMode().getName(),
servers.get(2).getMode());
+ }
+
+ private static String toIpPortStr(String ip, int port) {
+ return String.format(Locale.ROOT, "%s:%s", ip, port);
+ }
+}
diff --git
a/src/server/src/test/java/org/apache/kylin/rest/discovery/NodeRegistryServiceTest.java
b/src/server/src/test/java/org/apache/kylin/rest/discovery/NodeRegistryServiceTest.java
new file mode 100644
index 0000000000..7ca6dfdc6f
--- /dev/null
+++
b/src/server/src/test/java/org/apache/kylin/rest/discovery/NodeRegistryServiceTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.discovery;
+
+import static org.apache.kylin.common.util.ClusterConstant.ServerModeEnum;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.apache.kylin.metadata.system.NodeRegistry;
+import org.apache.kylin.metadata.system.NodeRegistryManager;
+import org.apache.kylin.rest.util.SpringContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.springframework.context.ApplicationContext;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+import org.springframework.scheduling.support.CronTrigger;
+import org.springframework.test.util.ReflectionTestUtils;
+
+public class NodeRegistryServiceTest extends NLocalFileMetadataTestCase {
+
+ private NodeRegistryService nodeRegistryService;
+
+ @Before
+ public void setUp() {
+ createTestMetadata();
+ overwriteSystemProp("kylin.env.zookeeper.enabled", "false");
+ overwriteSystemProp("kylin.server.node-registry.type", "jdbc");
+ overwriteSystemProp("kylin.metadata.distributed-lock-impl",
"org.apache.kylin.common.lock.LocalLockFactory");
+ overwriteSystemProp("kylin.server.mode", "all");
+
+ nodeRegistryService = Mockito.spy(new NodeRegistryService());
+
+ ApplicationContext applicationContext =
Mockito.mock(ApplicationContext.class);
+
when(applicationContext.getBean(NodeRegistryService.class)).thenReturn(nodeRegistryService);
+ SpringContext.setApplicationContextImpl(applicationContext);
+ }
+
+ @After
+ public void tearDown() {
+ cleanupTestMetadata();
+ }
+
+ @Test
+ public void testInitOnApplicationReady() {
+ doNothing().when(nodeRegistryService).manuallyScheduleTasks();
+ nodeRegistryService.initOnApplicationReady(null);
+ verify(nodeRegistryService, times(1)).tryCreateNodeRegistry();
+ verify(nodeRegistryService, times(1)).manuallyScheduleTasks();
+ assertNotNull(ReflectionTestUtils.getField(nodeRegistryService,
"scheduler"));
+ }
+
+ @Test
+ public void testManuallyScheduleTasks() {
+ try {
+ nodeRegistryService.manuallyScheduleTasks();
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("scheduler not initialized"));
+ }
+
+ ThreadPoolTaskScheduler scheduler =
mock(ThreadPoolTaskScheduler.class);
+ ReflectionTestUtils.setField(nodeRegistryService, "scheduler",
scheduler);
+ nodeRegistryService.manuallyScheduleTasks();
+ verify(scheduler, times(2)).schedule(any(Runnable.class),
any(CronTrigger.class));
+ }
+
+ @Test
+ public void testRenew() {
+ NodeRegistryManager manager =
NodeRegistryManager.getInstance(getTestConfig());
+ manager.createNodeRegistryIfNotExists();
+
+ List<NodeRegistry.NodeInstance> nodeInstances =
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.ALL);
+ assertEquals(1, nodeInstances.size());
+ NodeRegistry.NodeInstance instance = nodeInstances.get(0);
+
+ nodeRegistryService.scheduleRenew();
+ List<NodeRegistry.NodeInstance> newNodeInstances =
manager.getNodeRegistry()
+ .getNodeInstances(ServerModeEnum.ALL);
+ assertEquals(1, newNodeInstances.size());
+ NodeRegistry.NodeInstance newInstance = newNodeInstances.get(0);
+
+ assertTrue(newInstance.getLastHeartbeatTS() >
instance.getLastHeartbeatTS());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCheckAndClean() {
+ NodeRegistryManager manager =
NodeRegistryManager.getInstance(getTestConfig());
+ manager.createNodeRegistryIfNotExists();
+
+ CachedCrudAssist<NodeRegistry> crud = (CachedCrudAssist<NodeRegistry>)
ReflectionTestUtils.getField(manager,
+ "crud");
+ assertNotNull(crud);
+ NodeRegistry nodeRegistry =
crud.copyForWrite(manager.getNodeRegistry());
+ NodeRegistry.NodeInstance timeoutInstance = new
NodeRegistry.NodeInstance("123.123.123.123", 7070,
+ ServerModeEnum.QUERY);
+ ReflectionTestUtils.setField(timeoutInstance, "lastHeartbeatTS",
+ System.currentTimeMillis() -
getTestConfig().getNodeRegistryJdbcExpireThreshold() - 500);
+ nodeRegistry.registerOrUpdate(timeoutInstance);
+ crud.save(nodeRegistry);
+
+ List<NodeRegistry.NodeInstance> nodeInstances =
manager.getNodeRegistry()
+ .getNodeInstances(ServerModeEnum.QUERY);
+ assertEquals(1, nodeInstances.size());
+
+ overwriteSystemProp("kylin.metadata.distributed-lock-impl",
"org.apache.kylin.common.lock.LocalLockFactory");
+ nodeRegistryService.scheduleCheck();
+ List<NodeRegistry.NodeInstance> newNodeInstances =
manager.getNodeRegistry()
+ .getNodeInstances(ServerModeEnum.QUERY);
+ assertTrue(newNodeInstances.isEmpty());
+ }
+
+ @Test
+ public void testPreDestroy() {
+
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(UnitOfWorkParams.builder().unitName(UnitOfWork.GLOBAL_UNIT)
+
.projectId(StringUtils.EMPTY).readonly(false).maxRetry(1).processor(() -> {
+ NodeRegistryManager manager =
NodeRegistryManager.getInstance(getTestConfig());
+ manager.createNodeRegistryIfNotExists();
+ return null;
+ }).build());
+
+ NodeRegistryManager manager =
NodeRegistryManager.getInstance(getTestConfig());
+ List<NodeRegistry.NodeInstance> nodeInstances =
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.ALL);
+ assertEquals(1, nodeInstances.size());
+
+ nodeRegistryService.preDestroy();
+ assertNull(manager.getNodeRegistry());
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/ZKHelper.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/ZKHelper.scala
index 1fd321f64e..ea1863ab1f 100644
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/ZKHelper.scala
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/ZKHelper.scala
@@ -22,12 +22,17 @@ import org.apache.kylin.common.{KapConfig, KylinConfig}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-object ZKHelper extends Logging{
+/**
+ * Try zookeeper jaas config for distributed lock
+ * [[org.apache.kylin.common.lock.curator.CuratorDistributedLockFactory]]
+ */
+object ZKHelper extends Logging {
private val YARN_CLUSTER: String = "cluster"
def tryZKJaasConfiguration(ss: SparkSession): Unit = {
val config = KylinConfig.getInstanceFromEnv
- if (YARN_CLUSTER.equals(config.getDeployMode)) {
+ if (YARN_CLUSTER.equals(config.getDeployMode)
+ &&
config.getDistributedLockFactoryFullClassName.contains("CuratorDistributedLockFactory"))
{
val kapConfig = KapConfig.wrap(config)
if (KapConfig.FI_PLATFORM.equals(kapConfig.getKerberosPlatform) ||
KapConfig.TDH_PLATFORM.equals(kapConfig.getKerberosPlatform)) {
val sparkConf = ss.sparkContext.getConf