This is an automated email from the ASF dual-hosted git repository.
xincheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 5463d0255a Fix rpc channel leak due to concurrent operation (#16021)
5463d0255a is described below
commit 5463d0255a71212668606588c10189c2fb0185d2
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon May 20 12:10:33 2024 +0800
Fix rpc channel leak due to concurrent operation (#16021)
* Fix rpc channel leak due to concurrent operation
* Throw channel create failed exception
---------
Co-authored-by: Rick Cheng <[email protected]>
---
.../extract/base/client/NettyRemotingClient.java | 60 ++++++++++++++--------
.../extract/base/utils/HostTest.java | 31 +++++++++++
2 files changed, 70 insertions(+), 21 deletions(-)
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
index 3999f5c9f5..dafaae311d 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
@@ -31,10 +31,12 @@ import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
import java.net.InetSocketAddress;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
import lombok.extern.slf4j.Slf4j;
import io.netty.bootstrap.Bootstrap;
@@ -54,7 +56,8 @@ public class NettyRemotingClient implements AutoCloseable {
private final Bootstrap bootstrap = new Bootstrap();
- private final ConcurrentHashMap<Host, Channel> channels = new
ConcurrentHashMap<>(128);
+ private final ReentrantLock channelsLock = new ReentrantLock();
+ private final Map<Host, Channel> channels = new ConcurrentHashMap<>();
private final AtomicBoolean isStarted = new AtomicBoolean(false);
@@ -104,9 +107,10 @@ public class NettyRemotingClient implements AutoCloseable {
isStarted.compareAndSet(false, true);
}
- public IRpcResponse sendSync(final Host host, final Transporter
transporter,
+ public IRpcResponse sendSync(final Host host,
+ final Transporter transporter,
final long timeoutMillis) throws
InterruptedException, RemotingException {
- final Channel channel = getChannel(host);
+ final Channel channel = getOrCreateChannel(host);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail",
host));
}
@@ -137,36 +141,43 @@ public class NettyRemotingClient implements AutoCloseable
{
return iRpcResponse;
}
- private Channel getChannel(Host host) {
+ private Channel getOrCreateChannel(Host host) {
Channel channel = channels.get(host);
if (channel != null && channel.isActive()) {
return channel;
}
- return createChannel(host, true);
+ try {
+ channelsLock.lock();
+ channel = channels.get(host);
+ if (channel != null && channel.isActive()) {
+ return channel;
+ }
+ channel = createChannel(host);
+ channels.put(host, channel);
+ } finally {
+ channelsLock.unlock();
+ }
+ return channel;
}
/**
* create channel
*
- * @param host host
- * @param isSync sync flag
+ * @param host host
* @return channel
*/
- private Channel createChannel(Host host, boolean isSync) {
+ private Channel createChannel(Host host) {
try {
ChannelFuture future;
synchronized (bootstrap) {
future = bootstrap.connect(new InetSocketAddress(host.getIp(),
host.getPort()));
}
- if (isSync) {
- future.sync();
- }
+ future.await(clientConfig.getConnectTimeoutMillis());
if (future.isSuccess()) {
- Channel channel = future.channel();
- channels.put(host, channel);
- return channel;
+ return future.channel();
+ } else {
+ throw new IllegalArgumentException("connect to host: " + host
+ " failed", future.cause());
}
- throw new IllegalArgumentException("connect to host: " + host + "
failed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Connect to host: " + host + " failed",
e);
@@ -189,16 +200,23 @@ public class NettyRemotingClient implements AutoCloseable
{
}
private void closeChannels() {
- for (Channel channel : this.channels.values()) {
- channel.close();
+ try {
+ channelsLock.lock();
+ channels.values().forEach(Channel::close);
+ } finally {
+ channelsLock.unlock();
}
- this.channels.clear();
}
public void closeChannel(Host host) {
- Channel channel = this.channels.remove(host);
- if (channel != null) {
- channel.close();
+ try {
+ channelsLock.lock();
+ Channel channel = this.channels.remove(host);
+ if (channel != null) {
+ channel.close();
+ }
+ } finally {
+ channelsLock.unlock();
}
}
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/utils/HostTest.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/utils/HostTest.java
new file mode 100644
index 0000000000..5bd1a0a0ba
--- /dev/null
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/utils/HostTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.extract.base.utils;
+
+import org.junit.jupiter.api.Test;
+
+import com.google.common.truth.Truth;
+
+class HostTest {
+
+ @Test
+ void testEquals() {
+
Truth.assertThat(Host.of("localhost:8080")).isEqualTo(Host.of("localhost:8080"));
+ }
+
+}