This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 303a5af Fix mem leak caused by duplicate reconnect task (#7537)
303a5af is described below
commit 303a5aff45e38249c00603a3aebad1b04489f648
Author: GuoHao <[email protected]>
AuthorDate: Tue Apr 13 15:45:36 2021 +0800
Fix mem leak caused by duplicate reconnect task (#7537)
---
.../org/apache/dubbo/remoting/api/Connection.java | 56 +++++++++++++++++-----
.../dubbo/remoting/api/ConnectionHandler.java | 16 ++++++-
.../dubbo/remoting/api/ConnectionListener.java | 48 -------------------
3 files changed, 59 insertions(+), 61 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Connection.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Connection.java
index 24833aa..5b7d5d3 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Connection.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Connection.java
@@ -29,9 +29,11 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoop;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.AttributeKey;
@@ -42,6 +44,7 @@ import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -61,6 +64,8 @@ public class Connection extends AbstractReferenceCounted
implements ReferenceCou
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicReference<Channel> channel = new AtomicReference<>();
private final ChannelFuture initPromise;
+ private final Bootstrap bootstrap;
+ private final ConnectionListener connectionListener = new
ConnectionListener();
public Connection(URL url) {
url = ExecutorUtil.setThreadName(url, "DubboClientHandler");
@@ -70,6 +75,7 @@ public class Connection extends AbstractReferenceCounted
implements ReferenceCou
this.connectTimeout = Math.max(3000,
url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY,
Constants.DEFAULT_CONNECT_TIMEOUT));
this.closeFuture = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
this.remote = getConnectAddress();
+ this.bootstrap = create();
this.initPromise = connect();
}
@@ -81,11 +87,7 @@ public class Connection extends AbstractReferenceCounted
implements ReferenceCou
return closeFuture;
}
-
- public ChannelFuture connect() {
- if (isClosed()) {
- return null;
- }
+ private Bootstrap create() {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(NettyEventLoopFactory.NIO_EVENT_LOOP_GROUP)
.option(ChannelOption.SO_KEEPALIVE, true)
@@ -112,8 +114,19 @@ public class Connection extends AbstractReferenceCounted
implements ReferenceCou
// TODO support Socks5
}
});
+ return bootstrap;
+ }
+
+ public ChannelFuture connect() {
+ if (isClosed()) {
+ if (logger.isInfoEnabled()) {
+ logger.info(String.format("%s aborted to reconnect cause
connection closed. ", Connection.this));
+ }
+ return null;
+ }
+
final ChannelFuture promise = bootstrap.connect();
- promise.addListener(new ConnectionListener(this));
+ promise.addListener(this.connectionListener);
return promise;
}
@@ -123,13 +136,13 @@ public class Connection extends AbstractReferenceCounted
implements ReferenceCou
@Override
public String toString() {
- return "(Ref=" + ReferenceCountUtil.refCnt(this) + ",local=" +
(getChannel() == null ? null : getChannel().localAddress()) + ",remote=" +
getRemote();
+ return super.toString() + " (Ref=" + ReferenceCountUtil.refCnt(this) +
",local=" + (getChannel() == null ? null : getChannel().localAddress()) +
",remote=" + getRemote();
}
public void onGoaway(Channel channel) {
if (this.channel.compareAndSet(channel, null)) {
if (logger.isInfoEnabled()) {
- logger.info(String.format("Connection:%s goaway", this));
+ logger.info(String.format("%s goaway", this));
}
}
}
@@ -138,7 +151,7 @@ public class Connection extends AbstractReferenceCounted
implements ReferenceCou
this.channel.set(channel);
channel.attr(CONNECTION).set(this);
if (logger.isInfoEnabled()) {
- logger.info(String.format("Connection:%s connected ", this));
+ logger.info(String.format("%s connected ", this));
}
}
@@ -175,6 +188,9 @@ public class Connection extends AbstractReferenceCounted
implements ReferenceCou
}
public void close() {
+ if (logger.isInfoEnabled()) {
+ logger.info(String.format("Connection:%s freed ", this));
+ }
final Channel current = this.channel.get();
if (current != null) {
current.close();
@@ -200,8 +216,26 @@ public class Connection extends AbstractReferenceCounted
implements ReferenceCou
return url;
}
- private int getConnectTimeout() {
- return connectTimeout;
+ class ConnectionListener implements ChannelFutureListener {
+
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (future.isSuccess()) {
+ return;
+ }
+ final Connection conn = Connection.this;
+ if (conn.isClosed() || conn.refCnt() == 0) {
+ if (logger.isInfoEnabled()) {
+ logger.info(String.format("%s aborted to reconnect. %s",
conn, future.cause().getMessage()));
+ }
+ return;
+ }
+ if (logger.isInfoEnabled()) {
+ logger.info(String.format("%s is reconnecting, attempt=%d
cause=%s", conn, 0, future.cause().getMessage()));
+ }
+ final EventLoop loop = future.channel().eventLoop();
+ loop.schedule((Runnable) conn::connect, 1L, TimeUnit.SECONDS);
+ }
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionHandler.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionHandler.java
index 47dca24..bdb27b7 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionHandler.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionHandler.java
@@ -24,6 +24,7 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
+import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import java.util.concurrent.TimeUnit;
@@ -39,10 +40,18 @@ public class ConnectionHandler extends
ChannelInboundHandlerAdapter {
}
public void onGoAway(Channel channel) {
- channel.attr(GO_AWAY_KEY).set(true);
+ final Attribute<Boolean> attr = channel.attr(GO_AWAY_KEY);
+ if (Boolean.TRUE.equals(attr.get())) {
+ return;
+ }
+
+ attr.set(true);
if (connection != null) {
connection.onGoaway(channel);
}
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Channel %s go away ,schedule reconnect",
channel));
+ }
reconnect(channel);
}
@@ -65,8 +74,11 @@ public class ConnectionHandler extends
ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- reconnect(ctx.channel());
super.channelInactive(ctx);
+ final Attribute<Boolean> goawayAttr = ctx.channel().attr(GO_AWAY_KEY);
+ if (!Boolean.TRUE.equals(goawayAttr.get())) {
+ reconnect(ctx.channel());
+ }
}
private void reconnect(Channel channel) {
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionListener.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionListener.java
deleted file mode 100644
index a379ce0..0000000
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionListener.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dubbo.remoting.api;
-
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.EventLoop;
-
-import java.util.concurrent.TimeUnit;
-
-public class ConnectionListener implements ChannelFutureListener {
- private static final Logger log =
LoggerFactory.getLogger(ConnectionListener.class);
-
- private final Connection connection;
-
- public ConnectionListener(Connection connection) {
- this.connection = connection;
- }
-
- @Override
- public void operationComplete(ChannelFuture channelFuture) {
- if (!channelFuture.isSuccess()) {
- if (log.isInfoEnabled()) {
- log.info(String.format("Connection %s is reconnecting,
attempt=%d", connection, 0));
- }
- final EventLoop loop = channelFuture.channel().eventLoop();
- loop.schedule((Runnable) connection::connect, 1L,
TimeUnit.SECONDS);
- }
- }
-}
\ No newline at end of file