This is an automated email from the ASF dual-hosted git repository.

carryxyh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new 669705c  Fix FailbackClusterInvoker one risk of memory leak #2425 
(#2822)
669705c is described below

commit 669705c7d3fa69244cb3301af0f2cd1360609857
Author: 野菱 <[email protected]>
AuthorDate: Tue Dec 25 10:30:26 2018 +0800

    Fix FailbackClusterInvoker one risk of memory leak #2425 (#2822)
    
    1. limit the size of the map,default is 1000. can config by key failcapacity
    2. retry period,default is 100.can config by key retries.
    3. shutdown the timer when destroyed
---
 .../cluster/support/FailbackClusterInvoker.java    | 149 ++++++++++++++-------
 .../support/FailbackClusterInvokerTest.java        |  35 ++++-
 .../java/org/apache/dubbo/common/Constants.java    |   6 +
 .../src/main/resources/META-INF/compat/dubbo.xsd   |   5 +
 .../src/main/resources/META-INF/dubbo.xsd          |   5 +
 5 files changed, 143 insertions(+), 57 deletions(-)

diff --git 
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvoker.java
 
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvoker.java
index b8ae095..1594557 100644
--- 
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvoker.java
+++ 
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvoker.java
@@ -16,26 +16,24 @@
  */
 package org.apache.dubbo.rpc.cluster.support;
 
+import org.apache.dubbo.common.Constants;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.timer.HashedWheelTimer;
+import org.apache.dubbo.common.timer.Timeout;
+import org.apache.dubbo.common.timer.Timer;
+import org.apache.dubbo.common.timer.TimerTask;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.RpcResult;
-import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.cluster.Directory;
 import org.apache.dubbo.rpc.cluster.LoadBalance;
 
-import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -43,78 +41,125 @@ import java.util.concurrent.TimeUnit;
  * Especially useful for services of notification.
  *
  * <a href="http://en.wikipedia.org/wiki/Failback";>Failback</a>
- *
  */
 public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
 
     private static final Logger logger = 
LoggerFactory.getLogger(FailbackClusterInvoker.class);
 
-    private static final long RETRY_FAILED_PERIOD = 5 * 1000;
+    private static final long RETRY_FAILED_PERIOD = 5;
 
-    /**
-     * Use {@link NamedInternalThreadFactory} to produce {@link 
org.apache.dubbo.common.threadlocal.InternalThread}
-     * which with the use of {@link 
org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
-     */
-    private final ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(2,
-            new NamedInternalThreadFactory("failback-cluster-timer", true));
+    private final int retries;
 
-    private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed 
= new ConcurrentHashMap<>();
-    private volatile ScheduledFuture<?> retryFuture;
+    private final int failbackTasks;
+
+    private volatile Timer failTimer;
 
     public FailbackClusterInvoker(Directory<T> directory) {
         super(directory);
+
+        int retriesConfig = getUrl().getParameter(Constants.RETRIES_KEY, 
Constants.DEFAULT_FAILBACK_TIMES);
+        if (retriesConfig <= 0) {
+            retriesConfig = Constants.DEFAULT_FAILBACK_TIMES;
+        }
+        int failbackTasksConfig = 
getUrl().getParameter(Constants.FAIL_BACK_TASKS_KEY, 
Constants.DEFAULT_FAILBACK_TASKS);
+        if (failbackTasksConfig <= 0) {
+            failbackTasksConfig = Constants.DEFAULT_FAILBACK_TASKS;
+        }
+        retries = retriesConfig;
+        failbackTasks = failbackTasksConfig;
     }
 
-    private void addFailed(Invocation invocation, AbstractClusterInvoker<?> 
invoker) {
-        if (retryFuture == null) {
+    private void addFailed(LoadBalance loadbalance, Invocation invocation, 
List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
+        if (failTimer == null) {
             synchronized (this) {
-                if (retryFuture == null) {
-                    retryFuture = 
scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
-
-                        @Override
-                        public void run() {
-                            // collect retry statistics
-                            try {
-                                retryFailed();
-                            } catch (Throwable t) { // Defensive fault 
tolerance
-                                logger.error("Unexpected error occur at 
collect statistic", t);
-                            }
-                        }
-                    }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, 
TimeUnit.MILLISECONDS);
+                if (failTimer == null) {
+                    failTimer = new HashedWheelTimer(
+                            new NamedThreadFactory("failback-cluster-timer", 
true),
+                            1,
+                            TimeUnit.SECONDS, 32, failbackTasks);
                 }
             }
         }
-        failed.put(invocation, invoker);
-    }
-
-    void retryFailed() {
-        if (failed.isEmpty()) {
-            return;
-        }
-        for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new 
HashMap<>(failed).entrySet()) {
-            Invocation invocation = entry.getKey();
-            Invoker<?> invoker = entry.getValue();
-            try {
-                invoker.invoke(invocation);
-                failed.remove(invocation);
-            } catch (Throwable e) {
-                logger.error("Failed retry to invoke method " + 
invocation.getMethodName() + ", waiting again.", e);
-            }
+        RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, 
invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
+        try {
+            failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, 
TimeUnit.SECONDS);
+        } catch (Throwable e) {
+            logger.error("Failback background works error,invocation->" + 
invocation + ", exception: " + e.getMessage());
         }
     }
 
     @Override
     protected Result doInvoke(Invocation invocation, List<Invoker<T>> 
invokers, LoadBalance loadbalance) throws RpcException {
+        Invoker<T> invoker = null;
         try {
             checkInvokers(invokers, invocation);
-            Invoker<T> invoker = select(loadbalance, invocation, invokers, 
null);
+            invoker = select(loadbalance, invocation, invokers, null);
             return invoker.invoke(invocation);
         } catch (Throwable e) {
             logger.error("Failback to invoke method " + 
invocation.getMethodName() + ", wait for retry in background. Ignored 
exception: "
                     + e.getMessage() + ", ", e);
-            addFailed(invocation, this);
+            addFailed(loadbalance, invocation, invokers, invoker);
             return new RpcResult(); // ignore
         }
     }
 
+    @Override
+    public void destroy() {
+        super.destroy();
+        if (failTimer != null) {
+            failTimer.stop();
+        }
+    }
+
+    /**
+     * RetryTimerTask
+     */
+    private class RetryTimerTask implements TimerTask {
+        private final Invocation invocation;
+        private final LoadBalance loadbalance;
+        private final List<Invoker<T>> invokers;
+        private final List<Invoker<T>> lastInvokers = new ArrayList<>();
+        private final int retries;
+        private final long tick;
+        private int retryTimes = 0;
+
+        RetryTimerTask(LoadBalance loadbalance, Invocation invocation, 
List<Invoker<T>> invokers, Invoker<T> lastInvoker, int retries, long tick) {
+            this.loadbalance = loadbalance;
+            this.invocation = invocation;
+            this.invokers = invokers;
+            this.retries = retries;
+            this.tick = tick;
+            lastInvokers.add(lastInvoker);
+        }
+
+        @Override
+        public void run(Timeout timeout) {
+            try {
+                Invoker<T> retryInvoker = select(loadbalance, invocation, 
invokers, lastInvokers);
+                lastInvokers.clear();
+                lastInvokers.add(retryInvoker);
+                retryInvoker.invoke(invocation);
+            } catch (Throwable e) {
+                logger.error("Failed retry to invoke method " + 
invocation.getMethodName() + ", waiting again.", e);
+                if ((++retryTimes) >= retries) {
+                    logger.error("Failed retry times exceed threshold (" + 
retries + "), We have to abandon, invocation->" + invocation);
+                } else {
+                    rePut(timeout);
+                }
+            }
+        }
+
+        private void rePut(Timeout timeout) {
+            if (timeout == null) {
+                return;
+            }
+
+            Timer timer = timeout.timer();
+            if (timer.isStop() || timeout.isCancelled()) {
+                return;
+            }
+
+            timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
+        }
+    }
 }
diff --git 
a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvokerTest.java
 
b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvokerTest.java
index 42a4ec6..581d52e 100644
--- 
a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvokerTest.java
+++ 
b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvokerTest.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.rpc.cluster.support;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.DubboAppender;
 import org.apache.dubbo.common.utils.LogUtil;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
@@ -25,22 +26,32 @@ import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.RpcResult;
 import org.apache.dubbo.rpc.cluster.Directory;
 
+import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.FixMethodOrder;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.mock;
 
+ /**
+  * FailbackClusterInvokerTest
+  *
+  * add annotation @FixMethodOrder, the testARetryFailed Method must to first 
execution
+ */
 @SuppressWarnings("unchecked")
+@FixMethodOrder(org.junit.runners.MethodSorters.NAME_ASCENDING)
 public class FailbackClusterInvokerTest {
 
     List<Invoker<FailbackClusterInvokerTest>> invokers = new 
ArrayList<Invoker<FailbackClusterInvokerTest>>();
-    URL url = URL.valueOf("test://test:11/test");
+    URL url = URL.valueOf("test://test:11/test?retries=2&failbacktasks=2");
     Invoker<FailbackClusterInvokerTest> invoker = mock(Invoker.class);
     RpcInvocation invocation = new RpcInvocation();
     Directory<FailbackClusterInvokerTest> dic;
@@ -76,16 +87,17 @@ public class FailbackClusterInvokerTest {
     }
 
     @Test
-    public void testInvokeExceptoin() {
+    public void testInvokeException() {
         resetInvokerToException();
         FailbackClusterInvoker<FailbackClusterInvokerTest> invoker = new 
FailbackClusterInvoker<FailbackClusterInvokerTest>(
                 dic);
         invoker.invoke(invocation);
         Assert.assertNull(RpcContext.getContext().getInvoker());
+        DubboAppender.clear();
     }
 
     @Test()
-    public void testInvokeNoExceptoin() {
+    public void testInvokeNoException() {
 
         resetInvokerToNoException();
 
@@ -112,21 +124,34 @@ public class FailbackClusterInvokerTest {
         FailbackClusterInvoker<FailbackClusterInvokerTest> invoker = new 
FailbackClusterInvoker<FailbackClusterInvokerTest>(
                 dic);
         LogUtil.start();
+        DubboAppender.clear();
         invoker.invoke(invocation);
         assertEquals(1, LogUtil.findMessage("Failback to invoke"));
         LogUtil.stop();
     }
 
     @Test()
-    public void testRetryFailed() {
+    public void testARetryFailed() throws Exception {
+        //Test retries and
 
         resetInvokerToException();
 
         FailbackClusterInvoker<FailbackClusterInvokerTest> invoker = new 
FailbackClusterInvoker<FailbackClusterInvokerTest>(
                 dic);
+        LogUtil.start();
+        DubboAppender.clear();
+        invoker.invoke(invocation);
+        invoker.invoke(invocation);
         invoker.invoke(invocation);
         Assert.assertNull(RpcContext.getContext().getInvoker());
-        invoker.retryFailed();// when retry the invoker which get from failed 
map already is not the mocked invoker,so
+//        invoker.retryFailed();// when retry the invoker which get from 
failed map already is not the mocked invoker,so
+        //Ensure that the main thread is online
+        CountDownLatch countDown = new CountDownLatch(1);
+        countDown.await(15000L, TimeUnit.MILLISECONDS);
+        LogUtil.stop();
+        Assert.assertEquals("must have four error message ", 4, 
LogUtil.findMessage(Level.ERROR, "Failed retry to invoke method"));
+        Assert.assertEquals("must have two error message ", 2, 
LogUtil.findMessage(Level.ERROR, "Failed retry times exceed threshold"));
+        Assert.assertEquals("must have one error message ", 1, 
LogUtil.findMessage(Level.ERROR, "Failback background works error"));
         // it can be invoke successfully
     }
 }
\ No newline at end of file
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java 
b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
index 1750e31..da75ebd 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
@@ -159,6 +159,10 @@ public class Constants {
 
     public static final int DEFAULT_RETRIES = 2;
 
+    public static final int DEFAULT_FAILBACK_TASKS = 100;
+
+    public static final int DEFAULT_FAILBACK_TIMES = 3;
+
     // default buffer size is 8k.
     public static final int DEFAULT_BUFFER_SIZE = 8 * 1024;
 
@@ -300,6 +304,8 @@ public class Constants {
 
     public static final String RETRIES_KEY = "retries";
 
+    public static final String FAIL_BACK_TASKS_KEY = "failbacktasks";
+
     public static final String PROMPT_KEY = "prompt";
 
     public static final String DEFAULT_PROMPT = "dubbo>";
diff --git 
a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd 
b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
index cc20450..e3253e4 100644
--- 
a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
+++ 
b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
@@ -25,6 +25,11 @@
                 <xsd:documentation><![CDATA[ The method retry times. 
]]></xsd:documentation>
             </xsd:annotation>
         </xsd:attribute>
+        <xsd:attribute name="failbacktasks" type="xsd:string">
+            <xsd:annotation>
+                <xsd:documentation><![CDATA[ The max failback tasks capacity 
size. ]]></xsd:documentation>
+            </xsd:annotation>
+        </xsd:attribute>
         <xsd:attribute name="actives" type="xsd:string">
             <xsd:annotation>
                 <xsd:documentation><![CDATA[ The max active requests. 
]]></xsd:documentation>
diff --git 
a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd 
b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
index 5118653..b943684 100644
--- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
+++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
@@ -25,6 +25,11 @@
                 <xsd:documentation><![CDATA[ The method retry times. 
]]></xsd:documentation>
             </xsd:annotation>
         </xsd:attribute>
+        <xsd:attribute name="failbacktasks" type="xsd:string">
+            <xsd:annotation>
+                <xsd:documentation><![CDATA[ The max failback tasks capacity 
size. ]]></xsd:documentation>
+            </xsd:annotation>
+        </xsd:attribute>
         <xsd:attribute name="actives" type="xsd:string">
             <xsd:annotation>
                 <xsd:documentation><![CDATA[ The max active requests. 
]]></xsd:documentation>

Reply via email to