This is an automated email from the ASF dual-hosted git repository.

pfzhan pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new a5b294ebb0 KYLIN-6071 Provide new jdbc service discovery (#2320)
a5b294ebb0 is described below

commit a5b294ebb0d19462053c949177fbc0e374287a8c
Author: Guoliang Sun <[email protected]>
AuthorDate: Thu Feb 27 18:44:29 2025 +0800

    KYLIN-6071 Provide new jdbc service discovery (#2320)
    
    Co-authored-by: Yinghao Lin <[email protected]>
---
 build/sbin/bootstrap.sh                            |  10 +-
 build/sbin/check-2000-zookeeper-role.sh            |  17 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |  33 +++-
 .../apache/kylin/common/util/ClusterConstant.java  |   8 +
 .../src/main/resources/kylin-defaults0.properties  |   4 +
 .../apache/kylin/metadata/system/NodeRegistry.java | 172 +++++++++++++++++++++
 .../kylin/metadata/system/NodeRegistryManager.java | 135 ++++++++++++++++
 .../metadata/system/NodeRegistryManagerTest.java   |  64 ++++++++
 .../kylin/metadata/system/NodeRegistryTest.java    | 148 ++++++++++++++++++
 .../apache/kylin/rest/ZookeeperClusterManager.java |   2 +
 .../kylin/rest/ZookeeperHostInfoFetcher.java       |   2 +
 .../apache/kylin/rest/config/ZookeeperConfig.java  |   2 +
 .../ConditionalOnNodeRegistryJdbcEnabled.java}     |  28 ++--
 ...ConditionalOnNodeRegistryZookeeperEnabled.java} |  28 ++--
 .../JdbcClusterManager.java}                       |  49 +++---
 .../rest/discovery/KylinServiceDiscoveryCache.java |   1 +
 .../discovery/KylinServiceDiscoveryClient.java     |   1 +
 .../kylin/rest/discovery/NodeRegistryService.java  | 147 ++++++++++++++++++
 src/server/src/main/resources/application.yaml     |   1 +
 .../kylin/rest/config/ZookeeperConfigTest.java     |   1 +
 .../rest/discovery/JdbcClusterManagerTest.java     | 142 +++++++++++++++++
 .../rest/discovery/NodeRegistryServiceTest.java    | 162 +++++++++++++++++++
 .../kylin/engine/spark/builder/ZKHelper.scala      |   9 +-
 23 files changed, 1090 insertions(+), 76 deletions(-)

diff --git a/build/sbin/bootstrap.sh b/build/sbin/bootstrap.sh
index 770ae4de94..99f6490f07 100755
--- a/build/sbin/bootstrap.sh
+++ b/build/sbin/bootstrap.sh
@@ -144,13 +144,11 @@ function skipCheckOrNot() {
 }
 
 function checkZookeeperRole() {
-    # this is necessary in FI
-    source "${KYLIN_HOME}"/sbin/load-zookeeper-config.sh
-    if [[ $(skipCheckOrNot $KYLIN_SKIP_CHECK_MODE) ]]; then
-        return 0
+
+    KYLIN_ZOOKEEPER_ENABLED=$("$KYLIN_HOME"/bin/get-properties.sh 
kylin.env.zookeeper.enabled)
+    if [[ $KYLIN_ZOOKEEPER_ENABLED == "true" ]]; then
+        source ${KYLIN_HOME}/sbin/check-2000-zookeeper-role.sh
     fi
-    verboseLog "checking zookeeper role"
-    source "${KYLIN_HOME}"/sbin/check-2000-zookeeper-role.sh
 }
 
 function checkSparkDir() {
diff --git a/build/sbin/check-2000-zookeeper-role.sh 
b/build/sbin/check-2000-zookeeper-role.sh
index 3121c99553..fe57ba741a 100755
--- a/build/sbin/check-2000-zookeeper-role.sh
+++ b/build/sbin/check-2000-zookeeper-role.sh
@@ -21,10 +21,19 @@
 
 source $(cd -P -- "$(dirname -- "$0")" && pwd -P)/header.sh
 
-echo "Checking Zookeeper role..."
+source ${KYLIN_HOME}/sbin/init-kerberos.sh
+## init Kerberos if needed
+initKerberosIfNeeded
 
-zk_connect_string=$("${KYLIN_HOME}"/bin/get-properties.sh 
kylin.env.zookeeper-connect-string)
+KYLIN_ZOOKEEPER_ENABLED=$("$KYLIN_HOME"/bin/get-properties.sh 
kylin.env.zookeeper.enabled)
 
-if [[ -z $zk_connect_string ]]; then
-    quit "Failed: Zookeeper connect string is empty, please set 
'kylin.env.zookeeper-connect-string' in {KYLIN_HOME}/conf/kylin.properties"
+if [[ $KYLIN_ZOOKEEPER_ENABLED == "true" ]]; then
+    echo "Checking Zookeeper role..."
+    zk_connect_string=$("${KYLIN_HOME}"/bin/get-properties.sh 
kylin.env.zookeeper-connect-string)
+    if [[ -z $zk_connect_string ]]; then
+      quit "Failed: Zookeeper connect string is empty, please set 
'kylin.env.zookeeper-connect-string' in {KYLIN_HOME}/conf/kylin.properties"
+    fi
+else
+    echo "KYLIN_ZOOKEEPER_ENABLED is ${KYLIN_ZOOKEEPER_ENABLED}. Skip check."
+    exit 3
 fi
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index ae408e0565..c4f124c451 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -670,9 +670,12 @@ public abstract class KylinConfigBase implements 
Serializable {
     }
 
     public DistributedLockFactory getDistributedLockFactory() {
-        String clsName = getOptional("kylin.metadata.distributed-lock-impl",
+        return (DistributedLockFactory) 
ClassUtil.newInstance(getDistributedLockFactoryFullClassName());
+    }
+
+    public String getDistributedLockFactoryFullClassName() {
+        return getOptional("kylin.metadata.distributed-lock-impl",
                 
"org.apache.kylin.common.lock.curator.CuratorDistributedLockFactory");
-        return (DistributedLockFactory) ClassUtil.newInstance(clsName);
     }
 
     public StorageURL getJDBCDistributedLockURL() {
@@ -2497,7 +2500,7 @@ public abstract class KylinConfigBase implements 
Serializable {
 
     public String getMetadataStoreType() {
         if (!isJobNode()) {
-            return this.getOptional("kylin.server.store-type", "hdfs");
+            return this.getOptional("kylin.server.store-type", "jdbc");
         } else {
             return "jdbc";
         }
@@ -4563,4 +4566,28 @@ public abstract class KylinConfigBase implements 
Serializable {
     public boolean isImprovedSumDecimalPrecisionEnabled() {
         return 
Boolean.parseBoolean(getOptional("kylin.query.improved-sum-decimal-precision.enabled",
 FALSE));
     }
+
+    public String getServerNodeRegistryJdbcRenewCron() {
+        return getOptional("kylin.server.node-registry.jdbc.renew-cron", "0/30 
* * * * *");
+    }
+
+    public String getServerNodeRegistryJdbcCheckCron() {
+        return getOptional("kylin.server.node-registry.jdbc.check-cron", "0 
*/2 * * * *");
+    }
+
+    public long getNodeRegistryJdbcExpireThreshold() {
+        return 
TimeUtil.timeStringAs(getOptional("kylin.server.node-registry.jdbc.expire-threshold",
 "90s"),
+                TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Sleep 8s for preventing duplicated operation on other servers.
+     * For jdbc distributed lock implementation, the total locking duration
+     * should be smaller than {@link 
org.springframework.integration.jdbc.lock.DefaultLockRepository}'s ttl
+     * @return check idle time
+     */
+    public long getServerNodeRegistryJdbcCheckIdleTime() {
+        return 
TimeUtil.timeStringAs(getOptional("kylin.server.node-registry.jdbc.check-idle-time",
 "8s"),
+                TimeUnit.MILLISECONDS);
+    }
 }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/util/ClusterConstant.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/util/ClusterConstant.java
index b28d4d7be8..4e6f66473f 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/util/ClusterConstant.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/util/ClusterConstant.java
@@ -48,5 +48,13 @@ public class ClusterConstant implements Serializable {
             this.name = name;
         }
 
+        public static ServerModeEnum of(String name) {
+            for (ServerModeEnum e : values()) {
+                if (e.name.equalsIgnoreCase(name)) {
+                    return e;
+                }
+            }
+            return null;
+        }
     }
 }
diff --git a/src/core-common/src/main/resources/kylin-defaults0.properties 
b/src/core-common/src/main/resources/kylin-defaults0.properties
index 9a367db672..bd4f910e79 100644
--- a/src/core-common/src/main/resources/kylin-defaults0.properties
+++ b/src/core-common/src/main/resources/kylin-defaults0.properties
@@ -30,6 +30,7 @@ kylin.env.hdfs-working-dir=/kylin
 # The build engine writes data to this file system, which is the same as 
defaultFS in core-site-xml by default
 kylin.env.engine-write-fs=
 # zookeeper is used for distributed locked, service discovery, leader 
selection, etc.
+kylin.env.zookeeper.enabled=true
 # example: 
kylin.env.zookeeper-connect-string=10.1.2.1:2181,10.1.2.2:2181,10.1.2.3:2181
 kylin.env.zookeeper-connect-string=
 # save for InfluxDB
@@ -59,6 +60,9 @@ kylin.server.https.keystore-password=changeit
 # Set true to enable CORS for any origins
 kylin.server.cors.allow-all=false
 
+# Kylin service discovery type
+kylin.server.node-registry.type=zookeeper
+
 # ==================== JOB EXECUTION ====================
 
 ### Spark conf overwrite for build engine
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/system/NodeRegistry.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/system/NodeRegistry.java
new file mode 100644
index 0000000000..c825cb1eb2
--- /dev/null
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/system/NodeRegistry.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.system;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static org.apache.kylin.common.util.ClusterConstant.ServerModeEnum;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.MetadataType;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+@Getter
+@JsonAutoDetect(getterVisibility = NONE, isGetterVisibility = NONE, 
setterVisibility = NONE, fieldVisibility = NONE)
+@Slf4j
+public class NodeRegistry extends RootPersistentEntity {
+
+    public static final String NODE_REGISTRY = "node_registry";
+
+    @Getter
+    @JsonAutoDetect(getterVisibility = NONE, isGetterVisibility = NONE, 
setterVisibility = NONE, fieldVisibility = NONE)
+    @EqualsAndHashCode(callSuper = false)
+    @ToString
+    public static class NodeInstance {
+
+        public NodeInstance() {
+            // Non-args constructor for jackson deserializing
+        }
+
+        public NodeInstance(String host, int port, String modeStr) {
+            this(host, port, ServerModeEnum.valueOf(modeStr));
+        }
+
+        public NodeInstance(String host, int port, ServerModeEnum mode) {
+            this.host = host;
+            this.port = port;
+            this.serverMode = mode;
+            this.lastHeartbeatTS = System.currentTimeMillis();
+        }
+
+        @JsonProperty("host")
+        private String host;
+
+        @JsonProperty("port")
+        private int port;
+
+        @JsonProperty("server_mode")
+        private ServerModeEnum serverMode;
+
+        @JsonProperty("last_heartbeat_ts")
+        @EqualsAndHashCode.Exclude
+        private long lastHeartbeatTS;
+
+        public boolean hostAndPortEquals(NodeInstance that) {
+            return Objects.equals(host, that.host) && port == that.port;
+        }
+    }
+
+    @JsonProperty("name")
+    private final String name = NODE_REGISTRY;
+
+    @JsonProperty("registry")
+    private final Map<ServerModeEnum, List<NodeInstance>> registry = new 
HashMap<>();
+
+    @JsonProperty("last_cleanup_ts")
+    private long lastCleanupTS;
+
+    public List<NodeInstance> getNodeInstances(ServerModeEnum mode) {
+        return registry.getOrDefault(mode, new ArrayList<>());
+    }
+
+    public void registerOrUpdate(NodeInstance instance) {
+        Preconditions.checkNotNull(instance);
+        List<NodeInstance> nodeInstances = 
registry.computeIfAbsent(instance.getServerMode(), k -> new ArrayList<>());
+        if (nodeInstances.isEmpty()) {
+            nodeInstances.add(instance);
+        } else {
+            int indexFound = findIndexByHostAndPort(nodeInstances, instance);
+            if (indexFound > -1) {
+                nodeInstances.set(indexFound, instance);
+            } else {
+                nodeInstances.add(instance);
+            }
+        }
+    }
+
+    public void deregister(NodeInstance instance) {
+        Preconditions.checkNotNull(instance);
+        List<NodeInstance> nodeInstances = 
registry.get(instance.getServerMode());
+        if (nodeInstances != null && !nodeInstances.isEmpty()) {
+            int indexFound = findIndexByHostAndPort(nodeInstances, instance);
+            if (indexFound > -1) {
+                nodeInstances.remove(indexFound);
+            }
+        }
+    }
+
+    public void removeExpiredRegistrations() {
+        long now = System.currentTimeMillis();
+        long expireThreshold = 
KylinConfig.getInstanceFromEnv().getNodeRegistryJdbcExpireThreshold();
+        long timeoutTS = now - expireThreshold;
+        for (Map.Entry<ServerModeEnum, List<NodeInstance>> entry : 
registry.entrySet()) {
+            List<NodeInstance> nodeInstances = entry.getValue();
+            if (nodeInstances != null && !nodeInstances.isEmpty()) {
+                nodeInstances.removeIf(node -> node.getLastHeartbeatTS() < 
timeoutTS);
+            }
+        }
+        lastCleanupTS = now;
+    }
+
+    public boolean isEmpty() {
+        if (registry.isEmpty()) {
+            return true;
+        }
+        for (List<NodeInstance> nodeInstances : registry.values()) {
+            if (!nodeInstances.isEmpty()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public String resourceName() {
+        return NODE_REGISTRY;
+    }
+
+    @Override
+    public MetadataType resourceType() {
+        return MetadataType.SYSTEM;
+    }
+
+    private static int findIndexByHostAndPort(List<NodeInstance> 
nodeInstances, NodeInstance instance) {
+        for (int i = 0; i < nodeInstances.size(); i++) {
+            NodeInstance ins = nodeInstances.get(i);
+            if (ins.hostAndPortEquals(instance)) {
+                return i;
+            }
+        }
+        return -1;
+    }
+}
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/system/NodeRegistryManager.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/system/NodeRegistryManager.java
new file mode 100644
index 0000000000..72390c8e52
--- /dev/null
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/system/NodeRegistryManager.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.system;
+
+import static org.apache.kylin.common.util.ClusterConstant.ServerModeEnum;
+import static org.apache.kylin.metadata.system.NodeRegistry.NODE_REGISTRY;
+
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.MetadataType;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.common.util.AddressUtil;
+import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class NodeRegistryManager {
+
+    public static NodeRegistryManager getInstance(KylinConfig config) {
+        return config.getManager(NodeRegistryManager.class);
+    }
+
+    // called by reflection
+    static NodeRegistryManager newInstance(KylinConfig config) {
+        return new NodeRegistryManager(config);
+    }
+
+    private final KylinConfig config;
+    private final CachedCrudAssist<NodeRegistry> crud;
+    private final String host;
+    private final int port;
+    private final ServerModeEnum serverMode;
+
+    public NodeRegistryManager(KylinConfig config) {
+        if (!UnitOfWork.isAlreadyInTransaction()) {
+            log.info("Initializing DiscoveryManager with KylinConfig Id: {}", 
System.identityHashCode(config));
+        }
+        this.config = config;
+        crud = new CachedCrudAssist<NodeRegistry>(getStore(), 
MetadataType.SYSTEM, null, NodeRegistry.class) {
+            @Override
+            protected NodeRegistry initEntityAfterReload(NodeRegistry entity, 
String projectName) {
+                return entity;
+            }
+        };
+        host = AddressUtil.getLocalHostExactAddress();
+        port = Integer.parseInt(config.getServerPort());
+        serverMode = ServerModeEnum.of(config.getServerMode());
+    }
+
+    public NodeRegistry getNodeRegistry() {
+        return crud.get(NODE_REGISTRY);
+    }
+
+    public List<NodeRegistry.NodeInstance> getNodeInstances(ServerModeEnum 
mode) {
+        return getNodeRegistry().getNodeInstances(mode);
+    }
+
+    public NodeRegistry.NodeInstance getLocalNodeInstance() {
+        return new NodeRegistry.NodeInstance(host, port, serverMode);
+    }
+
+    public void createNodeRegistryIfNotExists() {
+        NodeRegistry nodeRegistry = getNodeRegistry();
+        if (nodeRegistry != null) {
+            renew();
+        } else {
+            NodeRegistry newNodeRegistry = copyForWrite(new NodeRegistry());
+            newNodeRegistry.registerOrUpdate(newNodeInstance());
+            crud.save(newNodeRegistry);
+        }
+    }
+
+    public void renew() {
+        NodeRegistry nodeRegistry = copyForWrite(getNodeRegistry());
+        nodeRegistry.registerOrUpdate(newNodeInstance());
+        crud.save(nodeRegistry);
+    }
+
+    public void checkAndClean() {
+        NodeRegistry nodeRegistry = getNodeRegistry();
+        if (nodeRegistry == null) {
+            return;
+        }
+        nodeRegistry = copyForWrite(nodeRegistry);
+        nodeRegistry.removeExpiredRegistrations();
+        crud.save(nodeRegistry);
+    }
+
+    public void cleanup() {
+        NodeRegistry nodeRegistry = getNodeRegistry();
+        if (nodeRegistry == null) {
+            return;
+        }
+        nodeRegistry = copyForWrite(nodeRegistry);
+        nodeRegistry.deregister(newNodeInstance());
+        if (nodeRegistry.isEmpty()) {
+            crud.delete(nodeRegistry);
+        } else {
+            crud.save(nodeRegistry);
+        }
+    }
+
+    public NodeRegistry copyForWrite(NodeRegistry nodeRegistry) {
+        Preconditions.checkNotNull(nodeRegistry);
+        return crud.copyForWrite(nodeRegistry);
+    }
+
+    ResourceStore getStore() {
+        return ResourceStore.getKylinMetaStore(config);
+    }
+
+    private NodeRegistry.NodeInstance newNodeInstance() {
+        return new NodeRegistry.NodeInstance(host, port, serverMode);
+    }
+}
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/system/NodeRegistryManagerTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/system/NodeRegistryManagerTest.java
new file mode 100644
index 0000000000..86830a8fbf
--- /dev/null
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/system/NodeRegistryManagerTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.system;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.ClusterConstant;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NodeRegistryManagerTest extends NLocalFileMetadataTestCase {
+
+    @Before
+    public void setUp() throws Exception {
+        createTestMetadata();
+        overwriteSystemProp("kylin.server.mode", "all");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testCreateNodeRegistry() throws Exception {
+        NodeRegistryManager manager = 
NodeRegistryManager.getInstance(getTestConfig());
+        manager.createNodeRegistryIfNotExists();
+
+        List<NodeRegistry.NodeInstance> nodeInstances = 
manager.getNodeRegistry()
+                .getNodeInstances(ClusterConstant.ServerModeEnum.ALL);
+        assertEquals(1, nodeInstances.size());
+        NodeRegistry.NodeInstance instance = nodeInstances.get(0);
+
+        // test renew condition on registry existing
+        manager.createNodeRegistryIfNotExists();
+        List<NodeRegistry.NodeInstance> newNodeInstances = 
manager.getNodeRegistry()
+                .getNodeInstances(ClusterConstant.ServerModeEnum.ALL);
+        assertEquals(1, newNodeInstances.size());
+        NodeRegistry.NodeInstance newInstance = newNodeInstances.get(0);
+
+        assertTrue(newInstance.getLastHeartbeatTS() > 
instance.getLastHeartbeatTS());
+    }
+}
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/system/NodeRegistryTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/system/NodeRegistryTest.java
new file mode 100644
index 0000000000..343f5a87d3
--- /dev/null
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/system/NodeRegistryTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.system;
+
+import static org.apache.kylin.common.util.ClusterConstant.ServerModeEnum;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.test.util.ReflectionTestUtils;
+
+public class NodeRegistryTest extends NLocalFileMetadataTestCase {
+
+    private NodeRegistry nodeRegistry;
+    private NodeRegistry.NodeInstance instanceAll1;
+    private NodeRegistry.NodeInstance instanceAll2;
+    private NodeRegistry.NodeInstance instanceJob1;
+
+    @Before
+    public void setup() throws Exception {
+        nodeRegistry = new NodeRegistry();
+        instanceAll1 = new NodeRegistry.NodeInstance("host1", 7070, 
ServerModeEnum.ALL);
+        instanceAll2 = new NodeRegistry.NodeInstance("host1", 7070, 
ServerModeEnum.ALL);
+        instanceJob1 = new NodeRegistry.NodeInstance("host2", 8080, 
ServerModeEnum.JOB);
+
+        // Mock KylinConfig to control expiration threshold
+        createTestMetadata();
+        
overwriteSystemProp("kylin.server.node-registry.jdbc.expire-threshold", "1s");
+    }
+
+    @After
+    public void tearDown() {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testRegisterOrUpdate_AddNewNode() {
+        nodeRegistry.registerOrUpdate(instanceAll1);
+        List<NodeRegistry.NodeInstance> instances = 
nodeRegistry.getNodeInstances(ServerModeEnum.ALL);
+        assertEquals(1, instances.size());
+        assertEquals(instanceAll1, instances.get(0));
+    }
+
+    @Test
+    public void testRegisterOrUpdate_UpdateExistingNode() throws 
InterruptedException {
+        nodeRegistry.registerOrUpdate(instanceAll1);
+        // Simulate update with new heartbeat time
+        NodeRegistry.NodeInstance updatedInstance = new 
NodeRegistry.NodeInstance("host1", 7070, ServerModeEnum.ALL);
+        nodeRegistry.registerOrUpdate(updatedInstance);
+
+        List<NodeRegistry.NodeInstance> instances = 
nodeRegistry.getNodeInstances(ServerModeEnum.ALL);
+        assertEquals(1, instances.size());
+        assertNotEquals(instanceAll1.getLastHeartbeatTS(), 
instances.get(0).getLastHeartbeatTS());
+    }
+
+    @Test
+    public void testDeregister_ExistingNode() {
+        nodeRegistry.registerOrUpdate(instanceAll1);
+        nodeRegistry.deregister(instanceAll1);
+        
assertTrue(nodeRegistry.getNodeInstances(ServerModeEnum.ALL).isEmpty());
+    }
+
+    @Test
+    public void testDeregister_NonExistingNode() {
+        nodeRegistry.registerOrUpdate(instanceAll1);
+        nodeRegistry.deregister(instanceJob1); // Different mode
+        assertEquals(1, 
nodeRegistry.getNodeInstances(ServerModeEnum.ALL).size());
+    }
+
+    @Test
+    public void testRemoveExpiredRegistrations() throws Exception {
+        // Create expired instance (older than threshold)
+        NodeRegistry.NodeInstance expiredInstance = new 
NodeRegistry.NodeInstance("host3", 9090, ServerModeEnum.QUERY);
+        ReflectionTestUtils.setField(expiredInstance, "lastHeartbeatTS", 
System.currentTimeMillis() - 2000);
+
+        // Create valid instance
+        NodeRegistry.NodeInstance validInstance = new 
NodeRegistry.NodeInstance("host4", 9091, ServerModeEnum.QUERY);
+        ReflectionTestUtils.setField(validInstance, "lastHeartbeatTS", 
System.currentTimeMillis() - 500);
+
+        nodeRegistry.registerOrUpdate(expiredInstance);
+        nodeRegistry.registerOrUpdate(validInstance);
+
+        nodeRegistry.removeExpiredRegistrations();
+
+        List<NodeRegistry.NodeInstance> instances = 
nodeRegistry.getNodeInstances(ServerModeEnum.QUERY);
+        assertEquals(1, instances.size());
+        assertEquals(validInstance, instances.get(0));
+    }
+
+    @Test
+    public void testIsEmpty() {
+        assertTrue(nodeRegistry.isEmpty());
+        nodeRegistry.registerOrUpdate(instanceAll1);
+        assertFalse(nodeRegistry.isEmpty());
+        nodeRegistry.deregister(instanceAll1);
+        assertTrue(nodeRegistry.isEmpty());
+    }
+
+    @Test
+    public void testNodeInstanceEqualsAndHashCode() {
+        assertEquals(instanceAll1, instanceAll2);
+        assertEquals(instanceAll1.hashCode(), instanceAll2.hashCode());
+
+        NodeRegistry.NodeInstance differentPort = new 
NodeRegistry.NodeInstance("host1", 7071, ServerModeEnum.ALL);
+        assertNotEquals(instanceAll1, differentPort);
+    }
+
+    @Test
+    public void testHostAndPortEquals() {
+        NodeRegistry.NodeInstance differentMode = new 
NodeRegistry.NodeInstance("host1", 7070, ServerModeEnum.JOB);
+        assertTrue(instanceAll1.hostAndPortEquals(differentMode));
+    }
+
+    @Test
+    public void testGetNodeInstancesByMode() {
+        nodeRegistry.registerOrUpdate(instanceAll1);
+        nodeRegistry.registerOrUpdate(instanceJob1);
+
+        List<NodeRegistry.NodeInstance> allInstances = 
nodeRegistry.getNodeInstances(ServerModeEnum.ALL);
+        assertEquals(1, allInstances.size());
+
+        List<NodeRegistry.NodeInstance> jobInstances = 
nodeRegistry.getNodeInstances(ServerModeEnum.JOB);
+        assertEquals(1, jobInstances.size());
+    }
+}
\ No newline at end of file
diff --git 
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperClusterManager.java 
b/src/server/src/main/java/org/apache/kylin/rest/ZookeeperClusterManager.java
index 50761c7b2b..66a56d93c6 100644
--- 
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperClusterManager.java
+++ 
b/src/server/src/main/java/org/apache/kylin/rest/ZookeeperClusterManager.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.kylin.rest.cluster.ClusterManager;
+import 
org.apache.kylin.rest.discovery.ConditionalOnNodeRegistryZookeeperEnabled;
 import org.apache.kylin.rest.discovery.KylinServiceDiscoveryCache;
 import org.apache.kylin.rest.discovery.KylinServiceDiscoveryClient;
 import org.apache.kylin.rest.response.ServerInfoResponse;
@@ -37,6 +38,7 @@ import lombok.val;
 import lombok.extern.slf4j.Slf4j;
 
 @ConditionalOnZookeeperEnabled // if missing, a default impl will be filled by 
AppConfig.clusterManager()
+@ConditionalOnNodeRegistryZookeeperEnabled
 @Component
 @Slf4j
 public class ZookeeperClusterManager implements ClusterManager {
diff --git 
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java 
b/src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java
index 658481e8ae..e23646288c 100644
--- 
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java
+++ 
b/src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java
@@ -18,6 +18,7 @@
 package org.apache.kylin.rest;
 
 import org.apache.kylin.common.util.HostInfoFetcher;
+import 
org.apache.kylin.rest.discovery.ConditionalOnNodeRegistryZookeeperEnabled;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.cloud.commons.util.InetUtils;
 import org.springframework.cloud.zookeeper.ConditionalOnZookeeperEnabled;
@@ -26,6 +27,7 @@ import org.springframework.stereotype.Component;
 import lombok.extern.slf4j.Slf4j;
 
 @ConditionalOnZookeeperEnabled
+@ConditionalOnNodeRegistryZookeeperEnabled
 @Component
 @Slf4j
 public class ZookeeperHostInfoFetcher implements HostInfoFetcher {
diff --git 
a/src/server/src/main/java/org/apache/kylin/rest/config/ZookeeperConfig.java 
b/src/server/src/main/java/org/apache/kylin/rest/config/ZookeeperConfig.java
index f93632b8f1..40cac84bcc 100644
--- a/src/server/src/main/java/org/apache/kylin/rest/config/ZookeeperConfig.java
+++ b/src/server/src/main/java/org/apache/kylin/rest/config/ZookeeperConfig.java
@@ -20,6 +20,7 @@ package org.apache.kylin.rest.config;
 
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.kylin.common.util.ZookeeperAclBuilder;
+import 
org.apache.kylin.rest.discovery.ConditionalOnNodeRegistryZookeeperEnabled;
 import org.springframework.cloud.zookeeper.ConditionalOnZookeeperEnabled;
 import org.springframework.cloud.zookeeper.CuratorFrameworkCustomizer;
 import org.springframework.context.annotation.Configuration;
@@ -29,6 +30,7 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 @Configuration(proxyBeanMethods = false)
 @ConditionalOnZookeeperEnabled
+@ConditionalOnNodeRegistryZookeeperEnabled
 public class ZookeeperConfig implements CuratorFrameworkCustomizer {
 
     @Override
diff --git 
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java 
b/src/server/src/main/java/org/apache/kylin/rest/discovery/ConditionalOnNodeRegistryJdbcEnabled.java
similarity index 55%
copy from 
src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java
copy to 
src/server/src/main/java/org/apache/kylin/rest/discovery/ConditionalOnNodeRegistryJdbcEnabled.java
index 658481e8ae..8f2821e7c4 100644
--- 
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java
+++ 
b/src/server/src/main/java/org/apache/kylin/rest/discovery/ConditionalOnNodeRegistryJdbcEnabled.java
@@ -15,26 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kylin.rest;
 
-import org.apache.kylin.common.util.HostInfoFetcher;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cloud.commons.util.InetUtils;
-import org.springframework.cloud.zookeeper.ConditionalOnZookeeperEnabled;
-import org.springframework.stereotype.Component;
+package org.apache.kylin.rest.discovery;
 
-import lombok.extern.slf4j.Slf4j;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
-@ConditionalOnZookeeperEnabled
-@Component
-@Slf4j
-public class ZookeeperHostInfoFetcher implements HostInfoFetcher {
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 
-    @Autowired
-    InetUtils inetUtils;
-
-    @Override
-    public String getHostname() {
-        return inetUtils.findFirstNonLoopbackHostInfo().getHostname();
-    }
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ ElementType.TYPE })
+@ConditionalOnProperty(name = "kylin.server.node-registry.type", havingValue = 
"jdbc")
+public @interface ConditionalOnNodeRegistryJdbcEnabled {
 }
diff --git 
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java 
b/src/server/src/main/java/org/apache/kylin/rest/discovery/ConditionalOnNodeRegistryZookeeperEnabled.java
similarity index 55%
copy from 
src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java
copy to 
src/server/src/main/java/org/apache/kylin/rest/discovery/ConditionalOnNodeRegistryZookeeperEnabled.java
index 658481e8ae..c9d884062a 100644
--- 
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperHostInfoFetcher.java
+++ 
b/src/server/src/main/java/org/apache/kylin/rest/discovery/ConditionalOnNodeRegistryZookeeperEnabled.java
@@ -15,26 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kylin.rest;
 
-import org.apache.kylin.common.util.HostInfoFetcher;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cloud.commons.util.InetUtils;
-import org.springframework.cloud.zookeeper.ConditionalOnZookeeperEnabled;
-import org.springframework.stereotype.Component;
+package org.apache.kylin.rest.discovery;
 
-import lombok.extern.slf4j.Slf4j;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
-@ConditionalOnZookeeperEnabled
-@Component
-@Slf4j
-public class ZookeeperHostInfoFetcher implements HostInfoFetcher {
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 
-    @Autowired
-    InetUtils inetUtils;
-
-    @Override
-    public String getHostname() {
-        return inetUtils.findFirstNonLoopbackHostInfo().getHostname();
-    }
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ ElementType.TYPE })
+@ConditionalOnProperty(name = "kylin.server.node-registry.type", havingValue = 
"zookeeper")
+public @interface ConditionalOnNodeRegistryZookeeperEnabled {
 }
diff --git 
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperClusterManager.java 
b/src/server/src/main/java/org/apache/kylin/rest/discovery/JdbcClusterManager.java
similarity index 57%
copy from 
src/server/src/main/java/org/apache/kylin/rest/ZookeeperClusterManager.java
copy to 
src/server/src/main/java/org/apache/kylin/rest/discovery/JdbcClusterManager.java
index 50761c7b2b..0be7595e29 100644
--- 
a/src/server/src/main/java/org/apache/kylin/rest/ZookeeperClusterManager.java
+++ 
b/src/server/src/main/java/org/apache/kylin/rest/discovery/JdbcClusterManager.java
@@ -15,44 +15,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kylin.rest;
+
+package org.apache.kylin.rest.discovery;
 
 import static org.apache.kylin.common.util.ClusterConstant.ServerModeEnum;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
 
-import org.apache.commons.lang3.ArrayUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClusterConstant;
+import org.apache.kylin.metadata.system.NodeRegistry;
+import org.apache.kylin.metadata.system.NodeRegistryManager;
 import org.apache.kylin.rest.cluster.ClusterManager;
-import org.apache.kylin.rest.discovery.KylinServiceDiscoveryCache;
-import org.apache.kylin.rest.discovery.KylinServiceDiscoveryClient;
 import org.apache.kylin.rest.response.ServerInfoResponse;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cloud.zookeeper.ConditionalOnZookeeperEnabled;
 import org.springframework.stereotype.Component;
 
-import lombok.val;
 import lombok.extern.slf4j.Slf4j;
 
-@ConditionalOnZookeeperEnabled // if missing, a default impl will be filled by 
AppConfig.clusterManager()
+@ConditionalOnNodeRegistryJdbcEnabled
 @Component
 @Slf4j
-public class ZookeeperClusterManager implements ClusterManager {
-
-    @Autowired
-    private KylinServiceDiscoveryCache serviceCache;
-
-    @Autowired
-    private KylinServiceDiscoveryClient discoveryClient;
-
-    public ZookeeperClusterManager() {
-    }
+public class JdbcClusterManager implements ClusterManager {
 
     @Override
     public String getLocalServer() {
-        return discoveryClient.getLocalServiceServer();
+        NodeRegistryManager manager = 
NodeRegistryManager.getInstance(KylinConfig.getInstanceFromEnv());
+        NodeRegistry.NodeInstance localInstance = 
manager.getLocalNodeInstance();
+        return instance2Str(localInstance);
     }
 
     @Override
@@ -75,17 +69,22 @@ public class ZookeeperClusterManager implements 
ClusterManager {
         return getServerByMode(ServerModeEnum.ALL, ServerModeEnum.JOB, 
ServerModeEnum.QUERY);
     }
 
-    private List<ServerInfoResponse> getServerByMode(@Nullable 
ServerModeEnum... serverModeEnum) {
+    private static List<ServerInfoResponse> getServerByMode(
+            @Nullable ClusterConstant.ServerModeEnum... serverModeEnum) {
         List<ServerInfoResponse> servers = new ArrayList<>();
-
-        if (ArrayUtils.isEmpty(serverModeEnum)) {
+        if (serverModeEnum == null || serverModeEnum.length == 0) {
             return servers;
         }
-
-        for (val nodeModeType : serverModeEnum) {
-            
servers.addAll(serviceCache.getServerInfoByServerMode(nodeModeType));
+        NodeRegistryManager manager = 
NodeRegistryManager.getInstance(KylinConfig.getInstanceFromEnv());
+        for (ClusterConstant.ServerModeEnum mode : serverModeEnum) {
+            servers.addAll(manager.getNodeInstances(mode).stream()
+                    .map(instance -> new 
ServerInfoResponse(instance2Str(instance), instance.getServerMode().getName()))
+                    .collect(Collectors.toList()));
         }
         return servers;
     }
 
+    private static String instance2Str(NodeRegistry.NodeInstance instance) {
+        return String.format(Locale.ROOT, "%s:%s", instance.getHost(), 
instance.getPort());
+    }
 }
diff --git 
a/src/server/src/main/java/org/apache/kylin/rest/discovery/KylinServiceDiscoveryCache.java
 
b/src/server/src/main/java/org/apache/kylin/rest/discovery/KylinServiceDiscoveryCache.java
index e1b3ac24fb..43f97c1c90 100644
--- 
a/src/server/src/main/java/org/apache/kylin/rest/discovery/KylinServiceDiscoveryCache.java
+++ 
b/src/server/src/main/java/org/apache/kylin/rest/discovery/KylinServiceDiscoveryCache.java
@@ -61,6 +61,7 @@ import org.springframework.stereotype.Component;
 import lombok.val;
 
 @ConditionalOnZookeeperEnabled
+@ConditionalOnNodeRegistryZookeeperEnabled
 @Component
 public class KylinServiceDiscoveryCache implements KylinServiceDiscovery {
     private static final Logger logger = 
LoggerFactory.getLogger(KylinServiceDiscoveryCache.class);
diff --git 
a/src/server/src/main/java/org/apache/kylin/rest/discovery/KylinServiceDiscoveryClient.java
 
b/src/server/src/main/java/org/apache/kylin/rest/discovery/KylinServiceDiscoveryClient.java
index 6422566ec9..a8a5075fe8 100644
--- 
a/src/server/src/main/java/org/apache/kylin/rest/discovery/KylinServiceDiscoveryClient.java
+++ 
b/src/server/src/main/java/org/apache/kylin/rest/discovery/KylinServiceDiscoveryClient.java
@@ -40,6 +40,7 @@ import lombok.val;
 import lombok.extern.slf4j.Slf4j;
 
 @ConditionalOnZookeeperEnabled
+@ConditionalOnNodeRegistryZookeeperEnabled
 @Component
 @Slf4j
 public class KylinServiceDiscoveryClient implements KylinServiceDiscovery {
diff --git 
a/src/server/src/main/java/org/apache/kylin/rest/discovery/NodeRegistryService.java
 
b/src/server/src/main/java/org/apache/kylin/rest/discovery/NodeRegistryService.java
new file mode 100644
index 0000000000..66b4a504a4
--- /dev/null
+++ 
b/src/server/src/main/java/org/apache/kylin/rest/discovery/NodeRegistryService.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.discovery;
+
+import java.util.TimeZone;
+import java.util.concurrent.locks.Lock;
+
+import javax.annotation.PreDestroy;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.ibatis.exceptions.PersistenceException;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.transaction.TransactionException;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.apache.kylin.metadata.system.NodeRegistryManager;
+import org.apache.kylin.rest.aspect.Transaction;
+import org.apache.kylin.rest.util.SpringContext;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+import org.springframework.scheduling.support.CronTrigger;
+import org.springframework.stereotype.Service;
+
+import lombok.extern.slf4j.Slf4j;
+
+@ConditionalOnNodeRegistryJdbcEnabled
+@Service
+@Slf4j
+public class NodeRegistryService {
+
+    private ThreadPoolTaskScheduler scheduler;
+
+    @EventListener(ApplicationReadyEvent.class)
+    public void initOnApplicationReady(ApplicationReadyEvent event) {
+        log.info("NodeRegistryService initOnApplicationReady");
+        
SpringContext.getBean(NodeRegistryService.class).tryCreateNodeRegistry();
+
+        // Since the spring scheduler annotation is skipped on query mode 
according to SchedulerEnhancer,
+        // an independent task scheduler is used.
+        // See org.apache.kylin.rest.aspect.SchedulerEnhancer for more details.
+        scheduler = new ThreadPoolTaskScheduler();
+        scheduler.setPoolSize(2);
+        scheduler.setThreadNamePrefix("NodeRegistryScheduler-");
+        scheduler.initialize();
+        
SpringContext.getBean(NodeRegistryService.class).manuallyScheduleTasks();
+    }
+
+    public void manuallyScheduleTasks() {
+        if (scheduler == null) {
+            throw new IllegalStateException("scheduler not initialized");
+        }
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        String renewCron = config.getServerNodeRegistryJdbcRenewCron();
+        String checkCron = config.getServerNodeRegistryJdbcCheckCron();
+        scheduler.schedule(() -> 
SpringContext.getBean(NodeRegistryService.class).scheduleRenew(),
+                new CronTrigger(renewCron, TimeZone.getDefault()));
+        scheduler.schedule(() -> 
SpringContext.getBean(NodeRegistryService.class).scheduleCheck(),
+                new CronTrigger(checkCron, TimeZone.getDefault()));
+    }
+
+    @PreDestroy
+    public void preDestroy() {
+        log.info("NodeRegistryService preDestroy");
+        
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(UnitOfWorkParams.builder().unitName(UnitOfWork.GLOBAL_UNIT)
+                
.projectId(StringUtils.EMPTY).readonly(false).maxRetry(1).processor(() -> {
+                    try {
+                        NodeRegistryManager manager = 
NodeRegistryManager.getInstance(KylinConfig.getInstanceFromEnv());
+                        manager.cleanup();
+                        return null;
+                    } catch (Throwable throwable) {
+                        throw new TransactionException(throwable);
+                    }
+                }).build());
+    }
+
+    public void scheduleRenew() {
+        log.debug("Start renew registry");
+        SpringContext.getBean(NodeRegistryService.class).renew();
+        log.debug("End renew registry");
+    }
+
+    public void scheduleCheck() {
+        Lock lock = 
KylinConfig.getInstanceFromEnv().getDistributedLockFactory()
+                .lockForCurrentProcess("node-registry-check-lock");
+        if (lock.tryLock()) {
+            try {
+                log.debug("Start check registry");
+                SpringContext.getBean(NodeRegistryService.class).check();
+                
sleep(KylinConfig.getInstanceFromEnv().getServerNodeRegistryJdbcCheckIdleTime());
+            } catch (Exception e) {
+                log.warn("Check registry failed", e);
+            } finally {
+                lock.unlock();
+                log.debug("End check registry");
+            }
+        }
+    }
+
+    @Transaction
+    public void tryCreateNodeRegistry() {
+        NodeRegistryManager manager = 
NodeRegistryManager.getInstance(KylinConfig.getInstanceFromEnv());
+        try {
+            manager.createNodeRegistryIfNotExists();
+        } catch (PersistenceException e) {
+            log.warn("SYSTEM/node_registry is most likely existing");
+        }
+    }
+
+    @Transaction
+    public void renew() {
+        NodeRegistryManager manager = 
NodeRegistryManager.getInstance(KylinConfig.getInstanceFromEnv());
+        manager.renew();
+    }
+
+    @Transaction(retry = 1)
+    public void check() {
+        NodeRegistryManager manager = 
NodeRegistryManager.getInstance(KylinConfig.getInstanceFromEnv());
+        manager.checkAndClean();
+    }
+
+    private static void sleep(long millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (InterruptedException e) {
+            log.warn("Thread sleeping was interrupted");
+            Thread.currentThread().interrupt();
+        }
+    }
+}
diff --git a/src/server/src/main/resources/application.yaml 
b/src/server/src/main/resources/application.yaml
index c4d6f36ed4..300f755173 100644
--- a/src/server/src/main/resources/application.yaml
+++ b/src/server/src/main/resources/application.yaml
@@ -89,6 +89,7 @@ spring:
         composite-indicator:
           enabled: false
     zookeeper:
+      enabled: ${kylin.env.zookeeper.enabled}
       connect-string: ${kylin.env.zookeeper-connect-string}
       discovery:
         root: /kylin/${kylin.metadata.url.unique-id}/services
diff --git 
a/src/server/src/test/java/org/apache/kylin/rest/config/ZookeeperConfigTest.java
 
b/src/server/src/test/java/org/apache/kylin/rest/config/ZookeeperConfigTest.java
index 008d0de076..d0f1f93f7a 100644
--- 
a/src/server/src/test/java/org/apache/kylin/rest/config/ZookeeperConfigTest.java
+++ 
b/src/server/src/test/java/org/apache/kylin/rest/config/ZookeeperConfigTest.java
@@ -46,6 +46,7 @@ class ZookeeperConfigTest {
 
     @Test
     @OverwriteProp.OverwriteProps({ //
+            @OverwriteProp(key = "kylin.server.node-registry.type", value = 
"zookeeper"),
             @OverwriteProp(key = "kylin.env.zookeeper-acl-enabled", value = 
"true"),
             @OverwriteProp(key = "kylin.env.zookeeper.zk-auth", value = 
"digest:ADMIN:KYLIN"),
             @OverwriteProp(key = "kylin.env.zookeeper.zk-acl", value = 
"world:anyone:rwcda") })
diff --git 
a/src/server/src/test/java/org/apache/kylin/rest/discovery/JdbcClusterManagerTest.java
 
b/src/server/src/test/java/org/apache/kylin/rest/discovery/JdbcClusterManagerTest.java
new file mode 100644
index 0000000000..4573c2f1c1
--- /dev/null
+++ 
b/src/server/src/test/java/org/apache/kylin/rest/discovery/JdbcClusterManagerTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.discovery;
+
+import static org.apache.kylin.common.util.ClusterConstant.ServerModeEnum;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.kylin.common.util.ClusterConstant;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
+import org.apache.kylin.metadata.system.NodeRegistry;
+import org.apache.kylin.metadata.system.NodeRegistryManager;
+import org.apache.kylin.rest.response.ServerInfoResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.test.util.ReflectionTestUtils;
+
+public class JdbcClusterManagerTest extends NLocalFileMetadataTestCase {
+
+    private JdbcClusterManager jdbcClusterManager;
+
+    @Before
+    @SuppressWarnings("unchecked")
+    public void setUp() throws Exception {
+        createTestMetadata();
+        overwriteSystemProp("kylin.server.mode", "all");
+
+        NodeRegistryManager manager = 
NodeRegistryManager.getInstance(getTestConfig());
+        manager.createNodeRegistryIfNotExists();
+
+        CachedCrudAssist<NodeRegistry> crud = (CachedCrudAssist<NodeRegistry>) 
ReflectionTestUtils.getField(manager,
+                "crud");
+        assertNotNull(crud);
+
+        {
+            NodeRegistry nodeRegistry = 
crud.copyForWrite(manager.getNodeRegistry());
+            NodeRegistry.NodeInstance instance = new 
NodeRegistry.NodeInstance("123.123.123.101", 7070,
+                    ClusterConstant.ServerModeEnum.QUERY);
+            nodeRegistry.registerOrUpdate(instance);
+            crud.save(nodeRegistry);
+        }
+        {
+            NodeRegistry nodeRegistry = 
crud.copyForWrite(manager.getNodeRegistry());
+            NodeRegistry.NodeInstance instance = new 
NodeRegistry.NodeInstance("123.123.123.102", 7070,
+                    ClusterConstant.ServerModeEnum.JOB);
+            nodeRegistry.registerOrUpdate(instance);
+            crud.save(nodeRegistry);
+        }
+
+        assertEquals(3, 
manager.getNodeRegistry().getRegistry().values().size());
+        assertEquals(1, 
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.ALL).size());
+        assertEquals(1, 
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.QUERY).size());
+        assertEquals(1, 
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.JOB).size());
+
+        jdbcClusterManager = new JdbcClusterManager();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testGetLocalServer() {
+        NodeRegistryManager manager = 
NodeRegistryManager.getInstance(getTestConfig());
+        NodeRegistry.NodeInstance instance = 
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.ALL).get(0);
+        assertEquals(toIpPortStr(instance.getHost(), instance.getPort()), 
jdbcClusterManager.getLocalServer());
+    }
+
+    @Test
+    public void testGetQueryServers() {
+        NodeRegistryManager manager = 
NodeRegistryManager.getInstance(getTestConfig());
+        NodeRegistry.NodeInstance all = 
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.ALL).get(0);
+        NodeRegistry.NodeInstance query = 
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.QUERY).get(0);
+
+        List<ServerInfoResponse> servers = 
jdbcClusterManager.getQueryServers();
+        assertEquals(2, servers.size());
+
+        assertEquals(toIpPortStr(all.getHost(), all.getPort()), 
servers.get(0).getHost());
+        assertEquals(all.getServerMode().getName(), servers.get(0).getMode());
+        assertEquals(toIpPortStr(query.getHost(), query.getPort()), 
servers.get(1).getHost());
+        assertEquals(query.getServerMode().getName(), 
servers.get(1).getMode());
+    }
+
+    @Test
+    public void testGetJobServers() {
+        NodeRegistryManager manager = 
NodeRegistryManager.getInstance(getTestConfig());
+        NodeRegistry.NodeInstance all = 
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.ALL).get(0);
+        NodeRegistry.NodeInstance job = 
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.JOB).get(0);
+
+        List<ServerInfoResponse> servers = jdbcClusterManager.getJobServers();
+        assertEquals(2, servers.size());
+
+        assertEquals(toIpPortStr(all.getHost(), all.getPort()), 
servers.get(0).getHost());
+        assertEquals(all.getServerMode().getName(), servers.get(0).getMode());
+        assertEquals(toIpPortStr(job.getHost(), job.getPort()), 
servers.get(1).getHost());
+        assertEquals(job.getServerMode().getName(), servers.get(1).getMode());
+    }
+
+    @Test
+    public void testGetServers() {
+        NodeRegistryManager manager = 
NodeRegistryManager.getInstance(getTestConfig());
+        NodeRegistry.NodeInstance all = 
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.ALL).get(0);
+        NodeRegistry.NodeInstance job = 
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.JOB).get(0);
+        NodeRegistry.NodeInstance query = 
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.QUERY).get(0);
+
+        List<ServerInfoResponse> servers = 
jdbcClusterManager.getServersFromCache();
+        assertEquals(3, servers.size());
+
+        assertEquals(toIpPortStr(all.getHost(), all.getPort()), 
servers.get(0).getHost());
+        assertEquals(all.getServerMode().getName(), servers.get(0).getMode());
+        assertEquals(toIpPortStr(job.getHost(), job.getPort()), 
servers.get(1).getHost());
+        assertEquals(job.getServerMode().getName(), servers.get(1).getMode());
+        assertEquals(toIpPortStr(query.getHost(), query.getPort()), 
servers.get(2).getHost());
+        assertEquals(query.getServerMode().getName(), 
servers.get(2).getMode());
+    }
+
+    private static String toIpPortStr(String ip, int port) {
+        return String.format(Locale.ROOT, "%s:%s", ip, port);
+    }
+}
diff --git 
a/src/server/src/test/java/org/apache/kylin/rest/discovery/NodeRegistryServiceTest.java
 
b/src/server/src/test/java/org/apache/kylin/rest/discovery/NodeRegistryServiceTest.java
new file mode 100644
index 0000000000..7ca6dfdc6f
--- /dev/null
+++ 
b/src/server/src/test/java/org/apache/kylin/rest/discovery/NodeRegistryServiceTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.discovery;
+
+import static org.apache.kylin.common.util.ClusterConstant.ServerModeEnum;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.apache.kylin.metadata.system.NodeRegistry;
+import org.apache.kylin.metadata.system.NodeRegistryManager;
+import org.apache.kylin.rest.util.SpringContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.springframework.context.ApplicationContext;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+import org.springframework.scheduling.support.CronTrigger;
+import org.springframework.test.util.ReflectionTestUtils;
+
+public class NodeRegistryServiceTest extends NLocalFileMetadataTestCase {
+
+    private NodeRegistryService nodeRegistryService;
+
+    @Before
+    public void setUp() {
+        createTestMetadata();
+        overwriteSystemProp("kylin.env.zookeeper.enabled", "false");
+        overwriteSystemProp("kylin.server.node-registry.type", "jdbc");
+        overwriteSystemProp("kylin.metadata.distributed-lock-impl", 
"org.apache.kylin.common.lock.LocalLockFactory");
+        overwriteSystemProp("kylin.server.mode", "all");
+
+        nodeRegistryService = Mockito.spy(new NodeRegistryService());
+
+        ApplicationContext applicationContext = 
Mockito.mock(ApplicationContext.class);
+        
when(applicationContext.getBean(NodeRegistryService.class)).thenReturn(nodeRegistryService);
+        SpringContext.setApplicationContextImpl(applicationContext);
+    }
+
+    @After
+    public void tearDown() {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testInitOnApplicationReady() {
+        doNothing().when(nodeRegistryService).manuallyScheduleTasks();
+        nodeRegistryService.initOnApplicationReady(null);
+        verify(nodeRegistryService, times(1)).tryCreateNodeRegistry();
+        verify(nodeRegistryService, times(1)).manuallyScheduleTasks();
+        assertNotNull(ReflectionTestUtils.getField(nodeRegistryService, 
"scheduler"));
+    }
+
+    @Test
+    public void testManuallyScheduleTasks() {
+        try {
+            nodeRegistryService.manuallyScheduleTasks();
+        } catch (IllegalStateException e) {
+            assertTrue(e.getMessage().contains("scheduler not initialized"));
+        }
+
+        ThreadPoolTaskScheduler scheduler = 
mock(ThreadPoolTaskScheduler.class);
+        ReflectionTestUtils.setField(nodeRegistryService, "scheduler", 
scheduler);
+        nodeRegistryService.manuallyScheduleTasks();
+        verify(scheduler, times(2)).schedule(any(Runnable.class), 
any(CronTrigger.class));
+    }
+
+    @Test
+    public void testRenew() {
+        NodeRegistryManager manager = 
NodeRegistryManager.getInstance(getTestConfig());
+        manager.createNodeRegistryIfNotExists();
+
+        List<NodeRegistry.NodeInstance> nodeInstances = 
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.ALL);
+        assertEquals(1, nodeInstances.size());
+        NodeRegistry.NodeInstance instance = nodeInstances.get(0);
+
+        nodeRegistryService.scheduleRenew();
+        List<NodeRegistry.NodeInstance> newNodeInstances = 
manager.getNodeRegistry()
+                .getNodeInstances(ServerModeEnum.ALL);
+        assertEquals(1, newNodeInstances.size());
+        NodeRegistry.NodeInstance newInstance = newNodeInstances.get(0);
+
+        assertTrue(newInstance.getLastHeartbeatTS() > 
instance.getLastHeartbeatTS());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testCheckAndClean() {
+        NodeRegistryManager manager = 
NodeRegistryManager.getInstance(getTestConfig());
+        manager.createNodeRegistryIfNotExists();
+
+        CachedCrudAssist<NodeRegistry> crud = (CachedCrudAssist<NodeRegistry>) 
ReflectionTestUtils.getField(manager,
+                "crud");
+        assertNotNull(crud);
+        NodeRegistry nodeRegistry = 
crud.copyForWrite(manager.getNodeRegistry());
+        NodeRegistry.NodeInstance timeoutInstance = new 
NodeRegistry.NodeInstance("123.123.123.123", 7070,
+                ServerModeEnum.QUERY);
+        ReflectionTestUtils.setField(timeoutInstance, "lastHeartbeatTS",
+                System.currentTimeMillis() - 
getTestConfig().getNodeRegistryJdbcExpireThreshold() - 500);
+        nodeRegistry.registerOrUpdate(timeoutInstance);
+        crud.save(nodeRegistry);
+
+        List<NodeRegistry.NodeInstance> nodeInstances = 
manager.getNodeRegistry()
+                .getNodeInstances(ServerModeEnum.QUERY);
+        assertEquals(1, nodeInstances.size());
+
+        overwriteSystemProp("kylin.metadata.distributed-lock-impl", 
"org.apache.kylin.common.lock.LocalLockFactory");
+        nodeRegistryService.scheduleCheck();
+        List<NodeRegistry.NodeInstance> newNodeInstances = 
manager.getNodeRegistry()
+                .getNodeInstances(ServerModeEnum.QUERY);
+        assertTrue(newNodeInstances.isEmpty());
+    }
+
+    @Test
+    public void testPreDestroy() {
+        
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(UnitOfWorkParams.builder().unitName(UnitOfWork.GLOBAL_UNIT)
+                
.projectId(StringUtils.EMPTY).readonly(false).maxRetry(1).processor(() -> {
+                    NodeRegistryManager manager = 
NodeRegistryManager.getInstance(getTestConfig());
+                    manager.createNodeRegistryIfNotExists();
+                    return null;
+                }).build());
+
+        NodeRegistryManager manager = 
NodeRegistryManager.getInstance(getTestConfig());
+        List<NodeRegistry.NodeInstance> nodeInstances = 
manager.getNodeRegistry().getNodeInstances(ServerModeEnum.ALL);
+        assertEquals(1, nodeInstances.size());
+
+        nodeRegistryService.preDestroy();
+        assertNull(manager.getNodeRegistry());
+    }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/ZKHelper.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/ZKHelper.scala
index 1fd321f64e..ea1863ab1f 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/ZKHelper.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/ZKHelper.scala
@@ -22,12 +22,17 @@ import org.apache.kylin.common.{KapConfig, KylinConfig}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 
-object ZKHelper extends Logging{
+/**
+ * Try zookeeper jaas config for distributed lock
+ * [[org.apache.kylin.common.lock.curator.CuratorDistributedLockFactory]]
+ */
+object ZKHelper extends Logging {
   private val YARN_CLUSTER: String = "cluster"
 
   def tryZKJaasConfiguration(ss: SparkSession): Unit = {
     val config = KylinConfig.getInstanceFromEnv
-    if (YARN_CLUSTER.equals(config.getDeployMode)) {
+    if (YARN_CLUSTER.equals(config.getDeployMode)
+      && 
config.getDistributedLockFactoryFullClassName.contains("CuratorDistributedLockFactory"))
 {
       val kapConfig = KapConfig.wrap(config)
       if (KapConfig.FI_PLATFORM.equals(kapConfig.getKerberosPlatform) || 
KapConfig.TDH_PLATFORM.equals(kapConfig.getKerberosPlatform)) {
         val sparkConf = ss.sparkContext.getConf

Reply via email to