This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 303a5af  Fix mem leak caused by duplicate reconnect task (#7537)
303a5af is described below

commit 303a5aff45e38249c00603a3aebad1b04489f648
Author: GuoHao <[email protected]>
AuthorDate: Tue Apr 13 15:45:36 2021 +0800

    Fix mem leak caused by duplicate reconnect task (#7537)
---
 .../org/apache/dubbo/remoting/api/Connection.java  | 56 +++++++++++++++++-----
 .../dubbo/remoting/api/ConnectionHandler.java      | 16 ++++++-
 .../dubbo/remoting/api/ConnectionListener.java     | 48 -------------------
 3 files changed, 59 insertions(+), 61 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Connection.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Connection.java
index 24833aa..5b7d5d3 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Connection.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/Connection.java
@@ -29,9 +29,11 @@ import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoop;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.AbstractReferenceCounted;
 import io.netty.util.AttributeKey;
@@ -42,6 +44,7 @@ import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -61,6 +64,8 @@ public class Connection extends AbstractReferenceCounted 
implements ReferenceCou
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final AtomicReference<Channel> channel = new AtomicReference<>();
     private final ChannelFuture initPromise;
+    private final Bootstrap bootstrap;
+    private final ConnectionListener connectionListener = new 
ConnectionListener();
 
     public Connection(URL url) {
         url = ExecutorUtil.setThreadName(url, "DubboClientHandler");
@@ -70,6 +75,7 @@ public class Connection extends AbstractReferenceCounted 
implements ReferenceCou
         this.connectTimeout = Math.max(3000, 
url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, 
Constants.DEFAULT_CONNECT_TIMEOUT));
         this.closeFuture = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
         this.remote = getConnectAddress();
+        this.bootstrap = create();
         this.initPromise = connect();
     }
 
@@ -81,11 +87,7 @@ public class Connection extends AbstractReferenceCounted 
implements ReferenceCou
         return closeFuture;
     }
 
-
-    public ChannelFuture connect() {
-        if (isClosed()) {
-            return null;
-        }
+    private Bootstrap create() {
         final Bootstrap bootstrap = new Bootstrap();
         bootstrap.group(NettyEventLoopFactory.NIO_EVENT_LOOP_GROUP)
                 .option(ChannelOption.SO_KEEPALIVE, true)
@@ -112,8 +114,19 @@ public class Connection extends AbstractReferenceCounted 
implements ReferenceCou
                 // TODO support Socks5
             }
         });
+        return bootstrap;
+    }
+
+    public ChannelFuture connect() {
+        if (isClosed()) {
+            if (logger.isInfoEnabled()) {
+                logger.info(String.format("%s aborted to reconnect cause 
connection closed. ", Connection.this));
+            }
+            return null;
+        }
+
         final ChannelFuture promise = bootstrap.connect();
-        promise.addListener(new ConnectionListener(this));
+        promise.addListener(this.connectionListener);
         return promise;
     }
 
@@ -123,13 +136,13 @@ public class Connection extends AbstractReferenceCounted 
implements ReferenceCou
 
     @Override
     public String toString() {
-        return "(Ref=" + ReferenceCountUtil.refCnt(this) + ",local=" + 
(getChannel() == null ? null : getChannel().localAddress()) + ",remote=" + 
getRemote();
+        return super.toString() + " (Ref=" + ReferenceCountUtil.refCnt(this) + 
",local=" + (getChannel() == null ? null : getChannel().localAddress()) + 
",remote=" + getRemote();
     }
 
     public void onGoaway(Channel channel) {
         if (this.channel.compareAndSet(channel, null)) {
             if (logger.isInfoEnabled()) {
-                logger.info(String.format("Connection:%s  goaway", this));
+                logger.info(String.format("%s goaway", this));
             }
         }
     }
@@ -138,7 +151,7 @@ public class Connection extends AbstractReferenceCounted 
implements ReferenceCou
         this.channel.set(channel);
         channel.attr(CONNECTION).set(this);
         if (logger.isInfoEnabled()) {
-            logger.info(String.format("Connection:%s connected ", this));
+            logger.info(String.format("%s connected ", this));
         }
     }
 
@@ -175,6 +188,9 @@ public class Connection extends AbstractReferenceCounted 
implements ReferenceCou
     }
 
     public void close() {
+        if (logger.isInfoEnabled()) {
+            logger.info(String.format("Connection:%s freed ", this));
+        }
         final Channel current = this.channel.get();
         if (current != null) {
             current.close();
@@ -200,8 +216,26 @@ public class Connection extends AbstractReferenceCounted 
implements ReferenceCou
         return url;
     }
 
-    private int getConnectTimeout() {
-        return connectTimeout;
+    class ConnectionListener implements ChannelFutureListener {
+
+        @Override
+        public void operationComplete(ChannelFuture future) {
+            if (future.isSuccess()) {
+                return;
+            }
+            final Connection conn = Connection.this;
+            if (conn.isClosed() || conn.refCnt() == 0) {
+                if (logger.isInfoEnabled()) {
+                    logger.info(String.format("%s aborted to reconnect. %s", 
conn, future.cause().getMessage()));
+                }
+                return;
+            }
+            if (logger.isInfoEnabled()) {
+                logger.info(String.format("%s is reconnecting, attempt=%d 
cause=%s", conn, 0, future.cause().getMessage()));
+            }
+            final EventLoop loop = future.channel().eventLoop();
+            loop.schedule((Runnable) conn::connect, 1L, TimeUnit.SECONDS);
+        }
     }
 }
 
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionHandler.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionHandler.java
index 47dca24..bdb27b7 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionHandler.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionHandler.java
@@ -24,6 +24,7 @@ import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.EventLoop;
+import io.netty.util.Attribute;
 import io.netty.util.AttributeKey;
 
 import java.util.concurrent.TimeUnit;
@@ -39,10 +40,18 @@ public class ConnectionHandler extends 
ChannelInboundHandlerAdapter {
     }
 
     public void onGoAway(Channel channel) {
-        channel.attr(GO_AWAY_KEY).set(true);
+        final Attribute<Boolean> attr = channel.attr(GO_AWAY_KEY);
+        if (Boolean.TRUE.equals(attr.get())) {
+            return;
+        }
+
+        attr.set(true);
         if (connection != null) {
             connection.onGoaway(channel);
         }
+        if (log.isInfoEnabled()) {
+            log.info(String.format("Channel %s go away ,schedule reconnect", 
channel));
+        }
         reconnect(channel);
     }
 
@@ -65,8 +74,11 @@ public class ConnectionHandler extends 
ChannelInboundHandlerAdapter {
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        reconnect(ctx.channel());
         super.channelInactive(ctx);
+        final Attribute<Boolean> goawayAttr = ctx.channel().attr(GO_AWAY_KEY);
+        if (!Boolean.TRUE.equals(goawayAttr.get())) {
+            reconnect(ctx.channel());
+        }
     }
 
     private void reconnect(Channel channel) {
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionListener.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionListener.java
deleted file mode 100644
index a379ce0..0000000
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/ConnectionListener.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dubbo.remoting.api;
-
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.EventLoop;
-
-import java.util.concurrent.TimeUnit;
-
-public class ConnectionListener implements ChannelFutureListener {
-    private static final Logger log = 
LoggerFactory.getLogger(ConnectionListener.class);
-
-    private final Connection connection;
-
-    public ConnectionListener(Connection connection) {
-        this.connection = connection;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture channelFuture) {
-        if (!channelFuture.isSuccess()) {
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Connection %s is reconnecting, 
attempt=%d", connection, 0));
-            }
-            final EventLoop loop = channelFuture.channel().eventLoop();
-            loop.schedule((Runnable) connection::connect, 1L, 
TimeUnit.SECONDS);
-        }
-    }
-}
\ No newline at end of file

Reply via email to