This is an automated email from the ASF dual-hosted git repository.
carryxyh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new 669705c Fix FailbackClusterInvoker one risk of memory leak #2425
(#2822)
669705c is described below
commit 669705c7d3fa69244cb3301af0f2cd1360609857
Author: 野菱 <[email protected]>
AuthorDate: Tue Dec 25 10:30:26 2018 +0800
Fix FailbackClusterInvoker one risk of memory leak #2425 (#2822)
1. limit the size of the map,default is 1000. can config by key failcapacity
2. retry period,default is 100.can config by key retries.
3. shutdown the timer when destroyed
---
.../cluster/support/FailbackClusterInvoker.java | 149 ++++++++++++++-------
.../support/FailbackClusterInvokerTest.java | 35 ++++-
.../java/org/apache/dubbo/common/Constants.java | 6 +
.../src/main/resources/META-INF/compat/dubbo.xsd | 5 +
.../src/main/resources/META-INF/dubbo.xsd | 5 +
5 files changed, 143 insertions(+), 57 deletions(-)
diff --git
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvoker.java
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvoker.java
index b8ae095..1594557 100644
---
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvoker.java
+++
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvoker.java
@@ -16,26 +16,24 @@
*/
package org.apache.dubbo.rpc.cluster.support;
+import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
+import org.apache.dubbo.common.timer.HashedWheelTimer;
+import org.apache.dubbo.common.timer.Timeout;
+import org.apache.dubbo.common.timer.Timer;
+import org.apache.dubbo.common.timer.TimerTask;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcResult;
-import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
-import java.util.HashMap;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
@@ -43,78 +41,125 @@ import java.util.concurrent.TimeUnit;
* Especially useful for services of notification.
*
* <a href="http://en.wikipedia.org/wiki/Failback">Failback</a>
- *
*/
public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger =
LoggerFactory.getLogger(FailbackClusterInvoker.class);
- private static final long RETRY_FAILED_PERIOD = 5 * 1000;
+ private static final long RETRY_FAILED_PERIOD = 5;
- /**
- * Use {@link NamedInternalThreadFactory} to produce {@link
org.apache.dubbo.common.threadlocal.InternalThread}
- * which with the use of {@link
org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
- */
- private final ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(2,
- new NamedInternalThreadFactory("failback-cluster-timer", true));
+ private final int retries;
- private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed
= new ConcurrentHashMap<>();
- private volatile ScheduledFuture<?> retryFuture;
+ private final int failbackTasks;
+
+ private volatile Timer failTimer;
public FailbackClusterInvoker(Directory<T> directory) {
super(directory);
+
+ int retriesConfig = getUrl().getParameter(Constants.RETRIES_KEY,
Constants.DEFAULT_FAILBACK_TIMES);
+ if (retriesConfig <= 0) {
+ retriesConfig = Constants.DEFAULT_FAILBACK_TIMES;
+ }
+ int failbackTasksConfig =
getUrl().getParameter(Constants.FAIL_BACK_TASKS_KEY,
Constants.DEFAULT_FAILBACK_TASKS);
+ if (failbackTasksConfig <= 0) {
+ failbackTasksConfig = Constants.DEFAULT_FAILBACK_TASKS;
+ }
+ retries = retriesConfig;
+ failbackTasks = failbackTasksConfig;
}
- private void addFailed(Invocation invocation, AbstractClusterInvoker<?>
invoker) {
- if (retryFuture == null) {
+ private void addFailed(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
+ if (failTimer == null) {
synchronized (this) {
- if (retryFuture == null) {
- retryFuture =
scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
-
- @Override
- public void run() {
- // collect retry statistics
- try {
- retryFailed();
- } catch (Throwable t) { // Defensive fault
tolerance
- logger.error("Unexpected error occur at
collect statistic", t);
- }
- }
- }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD,
TimeUnit.MILLISECONDS);
+ if (failTimer == null) {
+ failTimer = new HashedWheelTimer(
+ new NamedThreadFactory("failback-cluster-timer",
true),
+ 1,
+ TimeUnit.SECONDS, 32, failbackTasks);
}
}
}
- failed.put(invocation, invoker);
- }
-
- void retryFailed() {
- if (failed.isEmpty()) {
- return;
- }
- for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new
HashMap<>(failed).entrySet()) {
- Invocation invocation = entry.getKey();
- Invoker<?> invoker = entry.getValue();
- try {
- invoker.invoke(invocation);
- failed.remove(invocation);
- } catch (Throwable e) {
- logger.error("Failed retry to invoke method " +
invocation.getMethodName() + ", waiting again.", e);
- }
+ RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance,
invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
+ try {
+ failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD,
TimeUnit.SECONDS);
+ } catch (Throwable e) {
+ logger.error("Failback background works error,invocation->" +
invocation + ", exception: " + e.getMessage());
}
}
@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>>
invokers, LoadBalance loadbalance) throws RpcException {
+ Invoker<T> invoker = null;
try {
checkInvokers(invokers, invocation);
- Invoker<T> invoker = select(loadbalance, invocation, invokers,
null);
+ invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failback to invoke method " +
invocation.getMethodName() + ", wait for retry in background. Ignored
exception: "
+ e.getMessage() + ", ", e);
- addFailed(invocation, this);
+ addFailed(loadbalance, invocation, invokers, invoker);
return new RpcResult(); // ignore
}
}
+ @Override
+ public void destroy() {
+ super.destroy();
+ if (failTimer != null) {
+ failTimer.stop();
+ }
+ }
+
+ /**
+ * RetryTimerTask
+ */
+ private class RetryTimerTask implements TimerTask {
+ private final Invocation invocation;
+ private final LoadBalance loadbalance;
+ private final List<Invoker<T>> invokers;
+ private final List<Invoker<T>> lastInvokers = new ArrayList<>();
+ private final int retries;
+ private final long tick;
+ private int retryTimes = 0;
+
+ RetryTimerTask(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, Invoker<T> lastInvoker, int retries, long tick) {
+ this.loadbalance = loadbalance;
+ this.invocation = invocation;
+ this.invokers = invokers;
+ this.retries = retries;
+ this.tick = tick;
+ lastInvokers.add(lastInvoker);
+ }
+
+ @Override
+ public void run(Timeout timeout) {
+ try {
+ Invoker<T> retryInvoker = select(loadbalance, invocation,
invokers, lastInvokers);
+ lastInvokers.clear();
+ lastInvokers.add(retryInvoker);
+ retryInvoker.invoke(invocation);
+ } catch (Throwable e) {
+ logger.error("Failed retry to invoke method " +
invocation.getMethodName() + ", waiting again.", e);
+ if ((++retryTimes) >= retries) {
+ logger.error("Failed retry times exceed threshold (" +
retries + "), We have to abandon, invocation->" + invocation);
+ } else {
+ rePut(timeout);
+ }
+ }
+ }
+
+ private void rePut(Timeout timeout) {
+ if (timeout == null) {
+ return;
+ }
+
+ Timer timer = timeout.timer();
+ if (timer.isStop() || timeout.isCancelled()) {
+ return;
+ }
+
+ timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
+ }
+ }
}
diff --git
a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvokerTest.java
b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvokerTest.java
index 42a4ec6..581d52e 100644
---
a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvokerTest.java
+++
b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvokerTest.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.rpc.cluster.support;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.DubboAppender;
import org.apache.dubbo.common.utils.LogUtil;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
@@ -25,22 +26,32 @@ import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
+import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.FixMethodOrder;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
+ /**
+ * FailbackClusterInvokerTest
+ *
+ * add annotation @FixMethodOrder, the testARetryFailed Method must to first
execution
+ */
@SuppressWarnings("unchecked")
+@FixMethodOrder(org.junit.runners.MethodSorters.NAME_ASCENDING)
public class FailbackClusterInvokerTest {
List<Invoker<FailbackClusterInvokerTest>> invokers = new
ArrayList<Invoker<FailbackClusterInvokerTest>>();
- URL url = URL.valueOf("test://test:11/test");
+ URL url = URL.valueOf("test://test:11/test?retries=2&failbacktasks=2");
Invoker<FailbackClusterInvokerTest> invoker = mock(Invoker.class);
RpcInvocation invocation = new RpcInvocation();
Directory<FailbackClusterInvokerTest> dic;
@@ -76,16 +87,17 @@ public class FailbackClusterInvokerTest {
}
@Test
- public void testInvokeExceptoin() {
+ public void testInvokeException() {
resetInvokerToException();
FailbackClusterInvoker<FailbackClusterInvokerTest> invoker = new
FailbackClusterInvoker<FailbackClusterInvokerTest>(
dic);
invoker.invoke(invocation);
Assert.assertNull(RpcContext.getContext().getInvoker());
+ DubboAppender.clear();
}
@Test()
- public void testInvokeNoExceptoin() {
+ public void testInvokeNoException() {
resetInvokerToNoException();
@@ -112,21 +124,34 @@ public class FailbackClusterInvokerTest {
FailbackClusterInvoker<FailbackClusterInvokerTest> invoker = new
FailbackClusterInvoker<FailbackClusterInvokerTest>(
dic);
LogUtil.start();
+ DubboAppender.clear();
invoker.invoke(invocation);
assertEquals(1, LogUtil.findMessage("Failback to invoke"));
LogUtil.stop();
}
@Test()
- public void testRetryFailed() {
+ public void testARetryFailed() throws Exception {
+ //Test retries and
resetInvokerToException();
FailbackClusterInvoker<FailbackClusterInvokerTest> invoker = new
FailbackClusterInvoker<FailbackClusterInvokerTest>(
dic);
+ LogUtil.start();
+ DubboAppender.clear();
+ invoker.invoke(invocation);
+ invoker.invoke(invocation);
invoker.invoke(invocation);
Assert.assertNull(RpcContext.getContext().getInvoker());
- invoker.retryFailed();// when retry the invoker which get from failed
map already is not the mocked invoker,so
+// invoker.retryFailed();// when retry the invoker which get from
failed map already is not the mocked invoker,so
+ //Ensure that the main thread is online
+ CountDownLatch countDown = new CountDownLatch(1);
+ countDown.await(15000L, TimeUnit.MILLISECONDS);
+ LogUtil.stop();
+ Assert.assertEquals("must have four error message ", 4,
LogUtil.findMessage(Level.ERROR, "Failed retry to invoke method"));
+ Assert.assertEquals("must have two error message ", 2,
LogUtil.findMessage(Level.ERROR, "Failed retry times exceed threshold"));
+ Assert.assertEquals("must have one error message ", 1,
LogUtil.findMessage(Level.ERROR, "Failback background works error"));
// it can be invoke successfully
}
}
\ No newline at end of file
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
index 1750e31..da75ebd 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
@@ -159,6 +159,10 @@ public class Constants {
public static final int DEFAULT_RETRIES = 2;
+ public static final int DEFAULT_FAILBACK_TASKS = 100;
+
+ public static final int DEFAULT_FAILBACK_TIMES = 3;
+
// default buffer size is 8k.
public static final int DEFAULT_BUFFER_SIZE = 8 * 1024;
@@ -300,6 +304,8 @@ public class Constants {
public static final String RETRIES_KEY = "retries";
+ public static final String FAIL_BACK_TASKS_KEY = "failbacktasks";
+
public static final String PROMPT_KEY = "prompt";
public static final String DEFAULT_PROMPT = "dubbo>";
diff --git
a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
index cc20450..e3253e4 100644
---
a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
+++
b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
@@ -25,6 +25,11 @@
<xsd:documentation><![CDATA[ The method retry times.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="failbacktasks" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[ The max failback tasks capacity
size. ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
<xsd:attribute name="actives" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[ The max active requests.
]]></xsd:documentation>
diff --git
a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
index 5118653..b943684 100644
--- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
+++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
@@ -25,6 +25,11 @@
<xsd:documentation><![CDATA[ The method retry times.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="failbacktasks" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[ The max failback tasks capacity
size. ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
<xsd:attribute name="actives" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[ The max active requests.
]]></xsd:documentation>