This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new d87a0d831c [Fix-14008][registry] Fix etcd memory leak due to leaseId 
(#14034)
d87a0d831c is described below

commit d87a0d831c7bbf768c2c80ab1e0020c3ebc5f4aa
Author: eye-gu <[email protected]>
AuthorDate: Fri Jul 28 22:07:37 2023 +0800

    [Fix-14008][registry] Fix etcd memory leak due to leaseId (#14034)
---
 .../registry/etcd/EtcdKeepAliveLeaseManager.java   | 79 ++++++++++++++++++++++
 .../plugin/registry/etcd/EtcdRegistry.java         |  8 ++-
 .../etcd/EtcdKeepAliveLeaseManagerTest.java        | 72 ++++++++++++++++++++
 3 files changed, 156 insertions(+), 3 deletions(-)

diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManager.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManager.java
new file mode 100644
index 0000000000..db34592bab
--- /dev/null
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.etcd;
+
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+import lombok.extern.slf4j.Slf4j;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
+import io.grpc.stub.StreamObserver;
+
+@Slf4j
+public class EtcdKeepAliveLeaseManager {
+
+    private final Map<String, Long> keyLeaseCache = new ConcurrentHashMap<>();
+
+    private final Client client;
+
+    EtcdKeepAliveLeaseManager(Client client) {
+        this.client = client;
+    }
+
+    long getOrCreateKeepAliveLease(String key, long timeToLive) {
+        return keyLeaseCache.computeIfAbsent(key, $ -> {
+            try {
+                long leaseId = 
client.getLeaseClient().grant(timeToLive).get().getID();
+                client.getLeaseClient().keepAlive(leaseId, new 
StreamObserver<LeaseKeepAliveResponse>() {
+
+                    @Override
+                    public void onNext(LeaseKeepAliveResponse value) {
+                    }
+
+                    @Override
+                    public void onError(Throwable t) {
+                        log.error("Lease {} keep alive error, remove cache 
with key:{}", leaseId, key, t);
+                        keyLeaseCache.remove(key);
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                        log.error("Lease {} keep alive complete, remove cache 
with key:{}", leaseId, key);
+                        keyLeaseCache.remove(key);
+                    }
+                });
+                log.info("Lease {} keep alive create with key:{}", leaseId, 
key);
+                return leaseId;
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RegistryException("Failed to create lease key: " + 
key, e);
+            } catch (ExecutionException e) {
+                throw new RegistryException("Failed to create lease key: " + 
key, e);
+            }
+        });
+    }
+
+    Optional<Long> getKeepAliveLease(String key) {
+        return Optional.ofNullable(keyLeaseCache.get(key));
+    }
+}
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
index 10d3dfc226..21e9253529 100644
--- 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
@@ -76,6 +76,9 @@ public class EtcdRegistry implements Registry {
 
     private final Client client;
     private EtcdConnectionStateListener etcdConnectionStateListener;
+
+    private EtcdKeepAliveLeaseManager etcdKeepAliveLeaseManager;
+
     public static final String FOLDER_SEPARATOR = "/";
     // save the lock info for thread
     // key:lockKey Value:leaseId
@@ -120,6 +123,7 @@ public class EtcdRegistry implements Registry {
         client = clientBuilder.build();
         log.info("Started Etcd Registry...");
         etcdConnectionStateListener = new EtcdConnectionStateListener(client);
+        etcdKeepAliveLeaseManager = new EtcdKeepAliveLeaseManager(client);
     }
 
     /**
@@ -206,9 +210,7 @@ public class EtcdRegistry implements Registry {
         try {
             if (deleteOnDisconnect) {
                 // keep the key by lease, if disconnected, the lease will 
expire and the key will delete
-                long leaseId = 
client.getLeaseClient().grant(TIME_TO_LIVE_SECONDS).get().getID();
-                client.getLeaseClient().keepAlive(leaseId, 
Observers.observer(response -> {
-                }));
+                long leaseId = 
etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease(key, TIME_TO_LIVE_SECONDS);
                 PutOption putOption = 
PutOption.newBuilder().withLeaseId(leaseId).build();
                 client.getKVClient().put(byteSequence(key), 
byteSequence(value), putOption).get();
             } else {
diff --git 
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManagerTest.java
 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManagerTest.java
new file mode 100644
index 0000000000..70593e4bad
--- /dev/null
+++ 
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManagerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.etcd;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.launcher.EtcdCluster;
+import io.etcd.jetcd.test.EtcdClusterExtension;
+
+class EtcdKeepAliveLeaseManagerTest {
+
+    static EtcdClusterExtension server;
+
+    static Client client;
+
+    static EtcdKeepAliveLeaseManager etcdKeepAliveLeaseManager;
+    @BeforeAll
+    public static void before() throws Exception {
+        server = EtcdClusterExtension.builder()
+                .withNodes(1)
+                .withImage("ibmcom/etcd:3.2.24")
+                .build();
+        server.restart();
+
+        client = Client.builder().endpoints(server.clientEndpoints()).build();
+
+        etcdKeepAliveLeaseManager = new EtcdKeepAliveLeaseManager(client);
+    }
+
+    @Test
+    void getOrCreateKeepAliveLeaseTest() throws Exception {
+        long first = 
etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease("/test", 3);
+        long second = 
etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease("/test", 3);
+        Assertions.assertEquals(first, second);
+
+        client.getLeaseClient().revoke(first).get();
+
+        // wait for lease expire
+        Thread.sleep(3000);
+        Optional<Long> keepAliveLease = 
etcdKeepAliveLeaseManager.getKeepAliveLease("/test");
+        Assertions.assertFalse(keepAliveLease.isPresent());
+    }
+
+    @AfterAll
+    public static void after() throws IOException {
+        try (EtcdCluster closeServer = server.cluster()) {
+            client.close();
+        }
+    }
+}

Reply via email to