This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new d87a0d831c [Fix-14008][registry] Fix etcd memory leak due to leaseId
(#14034)
d87a0d831c is described below
commit d87a0d831c7bbf768c2c80ab1e0020c3ebc5f4aa
Author: eye-gu <[email protected]>
AuthorDate: Fri Jul 28 22:07:37 2023 +0800
[Fix-14008][registry] Fix etcd memory leak due to leaseId (#14034)
---
.../registry/etcd/EtcdKeepAliveLeaseManager.java | 79 ++++++++++++++++++++++
.../plugin/registry/etcd/EtcdRegistry.java | 8 ++-
.../etcd/EtcdKeepAliveLeaseManagerTest.java | 72 ++++++++++++++++++++
3 files changed, 156 insertions(+), 3 deletions(-)
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManager.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManager.java
new file mode 100644
index 0000000000..db34592bab
--- /dev/null
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.etcd;
+
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+import lombok.extern.slf4j.Slf4j;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
+import io.grpc.stub.StreamObserver;
+
+@Slf4j
+public class EtcdKeepAliveLeaseManager {
+
+ private final Map<String, Long> keyLeaseCache = new ConcurrentHashMap<>();
+
+ private final Client client;
+
+ EtcdKeepAliveLeaseManager(Client client) {
+ this.client = client;
+ }
+
+ long getOrCreateKeepAliveLease(String key, long timeToLive) {
+ return keyLeaseCache.computeIfAbsent(key, $ -> {
+ try {
+ long leaseId =
client.getLeaseClient().grant(timeToLive).get().getID();
+ client.getLeaseClient().keepAlive(leaseId, new
StreamObserver<LeaseKeepAliveResponse>() {
+
+ @Override
+ public void onNext(LeaseKeepAliveResponse value) {
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ log.error("Lease {} keep alive error, remove cache
with key:{}", leaseId, key, t);
+ keyLeaseCache.remove(key);
+ }
+
+ @Override
+ public void onCompleted() {
+ log.error("Lease {} keep alive complete, remove cache
with key:{}", leaseId, key);
+ keyLeaseCache.remove(key);
+ }
+ });
+ log.info("Lease {} keep alive create with key:{}", leaseId,
key);
+ return leaseId;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RegistryException("Failed to create lease key: " +
key, e);
+ } catch (ExecutionException e) {
+ throw new RegistryException("Failed to create lease key: " +
key, e);
+ }
+ });
+ }
+
+ Optional<Long> getKeepAliveLease(String key) {
+ return Optional.ofNullable(keyLeaseCache.get(key));
+ }
+}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
index 10d3dfc226..21e9253529 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
@@ -76,6 +76,9 @@ public class EtcdRegistry implements Registry {
private final Client client;
private EtcdConnectionStateListener etcdConnectionStateListener;
+
+ private EtcdKeepAliveLeaseManager etcdKeepAliveLeaseManager;
+
public static final String FOLDER_SEPARATOR = "/";
// save the lock info for thread
// key:lockKey Value:leaseId
@@ -120,6 +123,7 @@ public class EtcdRegistry implements Registry {
client = clientBuilder.build();
log.info("Started Etcd Registry...");
etcdConnectionStateListener = new EtcdConnectionStateListener(client);
+ etcdKeepAliveLeaseManager = new EtcdKeepAliveLeaseManager(client);
}
/**
@@ -206,9 +210,7 @@ public class EtcdRegistry implements Registry {
try {
if (deleteOnDisconnect) {
// keep the key by lease, if disconnected, the lease will
expire and the key will delete
- long leaseId =
client.getLeaseClient().grant(TIME_TO_LIVE_SECONDS).get().getID();
- client.getLeaseClient().keepAlive(leaseId,
Observers.observer(response -> {
- }));
+ long leaseId =
etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease(key, TIME_TO_LIVE_SECONDS);
PutOption putOption =
PutOption.newBuilder().withLeaseId(leaseId).build();
client.getKVClient().put(byteSequence(key),
byteSequence(value), putOption).get();
} else {
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManagerTest.java
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManagerTest.java
new file mode 100644
index 0000000000..70593e4bad
--- /dev/null
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManagerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.etcd;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.launcher.EtcdCluster;
+import io.etcd.jetcd.test.EtcdClusterExtension;
+
+class EtcdKeepAliveLeaseManagerTest {
+
+ static EtcdClusterExtension server;
+
+ static Client client;
+
+ static EtcdKeepAliveLeaseManager etcdKeepAliveLeaseManager;
+ @BeforeAll
+ public static void before() throws Exception {
+ server = EtcdClusterExtension.builder()
+ .withNodes(1)
+ .withImage("ibmcom/etcd:3.2.24")
+ .build();
+ server.restart();
+
+ client = Client.builder().endpoints(server.clientEndpoints()).build();
+
+ etcdKeepAliveLeaseManager = new EtcdKeepAliveLeaseManager(client);
+ }
+
+ @Test
+ void getOrCreateKeepAliveLeaseTest() throws Exception {
+ long first =
etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease("/test", 3);
+ long second =
etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease("/test", 3);
+ Assertions.assertEquals(first, second);
+
+ client.getLeaseClient().revoke(first).get();
+
+ // wait for lease expire
+ Thread.sleep(3000);
+ Optional<Long> keepAliveLease =
etcdKeepAliveLeaseManager.getKeepAliveLease("/test");
+ Assertions.assertFalse(keepAliveLease.isPresent());
+ }
+
+ @AfterAll
+ public static void after() throws IOException {
+ try (EtcdCluster closeServer = server.cluster()) {
+ client.close();
+ }
+ }
+}