This is an automated email from the ASF dual-hosted git repository.

xincheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5463d0255a Fix rpc channel leak due to concurrent operation (#16021)
5463d0255a is described below

commit 5463d0255a71212668606588c10189c2fb0185d2
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon May 20 12:10:33 2024 +0800

    Fix rpc channel leak due to concurrent operation (#16021)
    
    * Fix rpc channel leak due to concurrent operation
    
    * Throw channel create failed exception
    
    ---------
    
    Co-authored-by: Rick Cheng <[email protected]>
---
 .../extract/base/client/NettyRemotingClient.java   | 60 ++++++++++++++--------
 .../extract/base/utils/HostTest.java               | 31 +++++++++++
 2 files changed, 70 insertions(+), 21 deletions(-)

diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
index 3999f5c9f5..dafaae311d 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
@@ -31,10 +31,12 @@ import org.apache.dolphinscheduler.extract.base.utils.Host;
 import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
 
 import java.net.InetSocketAddress;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 
 import lombok.extern.slf4j.Slf4j;
 import io.netty.bootstrap.Bootstrap;
@@ -54,7 +56,8 @@ public class NettyRemotingClient implements AutoCloseable {
 
     private final Bootstrap bootstrap = new Bootstrap();
 
-    private final ConcurrentHashMap<Host, Channel> channels = new 
ConcurrentHashMap<>(128);
+    private final ReentrantLock channelsLock = new ReentrantLock();
+    private final Map<Host, Channel> channels = new ConcurrentHashMap<>();
 
     private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
@@ -104,9 +107,10 @@ public class NettyRemotingClient implements AutoCloseable {
         isStarted.compareAndSet(false, true);
     }
 
-    public IRpcResponse sendSync(final Host host, final Transporter 
transporter,
+    public IRpcResponse sendSync(final Host host,
+                                 final Transporter transporter,
                                  final long timeoutMillis) throws 
InterruptedException, RemotingException {
-        final Channel channel = getChannel(host);
+        final Channel channel = getOrCreateChannel(host);
         if (channel == null) {
             throw new RemotingException(String.format("connect to : %s fail", 
host));
         }
@@ -137,36 +141,43 @@ public class NettyRemotingClient implements AutoCloseable 
{
         return iRpcResponse;
     }
 
-    private Channel getChannel(Host host) {
+    private Channel getOrCreateChannel(Host host) {
         Channel channel = channels.get(host);
         if (channel != null && channel.isActive()) {
             return channel;
         }
-        return createChannel(host, true);
+        try {
+            channelsLock.lock();
+            channel = channels.get(host);
+            if (channel != null && channel.isActive()) {
+                return channel;
+            }
+            channel = createChannel(host);
+            channels.put(host, channel);
+        } finally {
+            channelsLock.unlock();
+        }
+        return channel;
     }
 
     /**
      * create channel
      *
-     * @param host   host
-     * @param isSync sync flag
+     * @param host host
      * @return channel
      */
-    private Channel createChannel(Host host, boolean isSync) {
+    private Channel createChannel(Host host) {
         try {
             ChannelFuture future;
             synchronized (bootstrap) {
                 future = bootstrap.connect(new InetSocketAddress(host.getIp(), 
host.getPort()));
             }
-            if (isSync) {
-                future.sync();
-            }
+            future.await(clientConfig.getConnectTimeoutMillis());
             if (future.isSuccess()) {
-                Channel channel = future.channel();
-                channels.put(host, channel);
-                return channel;
+                return future.channel();
+            } else {
+                throw new IllegalArgumentException("connect to host: " + host 
+ " failed", future.cause());
             }
-            throw new IllegalArgumentException("connect to host: " + host + " 
failed");
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new RuntimeException("Connect to host: " + host + " failed", 
e);
@@ -189,16 +200,23 @@ public class NettyRemotingClient implements AutoCloseable 
{
     }
 
     private void closeChannels() {
-        for (Channel channel : this.channels.values()) {
-            channel.close();
+        try {
+            channelsLock.lock();
+            channels.values().forEach(Channel::close);
+        } finally {
+            channelsLock.unlock();
         }
-        this.channels.clear();
     }
 
     public void closeChannel(Host host) {
-        Channel channel = this.channels.remove(host);
-        if (channel != null) {
-            channel.close();
+        try {
+            channelsLock.lock();
+            Channel channel = this.channels.remove(host);
+            if (channel != null) {
+                channel.close();
+            }
+        } finally {
+            channelsLock.unlock();
         }
     }
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/utils/HostTest.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/utils/HostTest.java
new file mode 100644
index 0000000000..5bd1a0a0ba
--- /dev/null
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/utils/HostTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.extract.base.utils;
+
+import org.junit.jupiter.api.Test;
+
+import com.google.common.truth.Truth;
+
+class HostTest {
+
+    @Test
+    void testEquals() {
+        
Truth.assertThat(Host.of("localhost:8080")).isEqualTo(Host.of("localhost:8080"));
+    }
+
+}

Reply via email to