This is an automated email from the ASF dual-hosted git repository.
iluo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new 04eacfe New threadLocal provides more performance. (#1745)
04eacfe is described below
commit 04eacfeac7e4e25d3c2f0a2de12fa05708ee6b59
Author: 时无两丶 <[email protected]>
AuthorDate: Tue May 15 10:08:45 2018 +0800
New threadLocal provides more performance. (#1745)
* SerializerFactory 获取Serializer时,锁住整个hashmap,导致整个过程被block
* 单元测试。保证一个class只有一个serializer和deserializer。单线程和多线程测试
* 增加线程数 50 模拟多个线程来获取serializer和deserializer
* 当cores线程数全都使用的情况下,默认线程池会把任务放入到队列中。队列满则再创建线程(总数不会超过Max线程数)
增强线程池:在请求量阶段性出现高峰时使用
特性:cores线程全部使用的情况下,优先创建线程(总数不会超过max),当max个线程全都在忙的情况下,才将任务放入队列。请求量下降时,线程池会自动维持cores个线程,多余的线程退出。
* 当cores线程数全都使用的情况下,默认线程池会把任务放入到队列中。队列满则再创建线程(总数不会超过Max线程数)
增强线程池:在请求量阶段性出现高峰时使用
特性:cores线程全部使用的情况下,优先创建线程(总数不会超过max),当max个线程全都在忙的情况下,才将任务放入队列。请求量下降时,线程池会自动维持cores个线程,多余的线程退出。
* 补全单元测试,测试扩展是否生效
* 错误命名
* 增加@Override注解
long 初始化赋值时,小写l改为大写L防止误读
* 修复单元测试
* remove enhanced
* remove enhanced
* Faster ThreadLocal impl in internal use
* Used in RpcContext`s LOCAL field.
* Faster get than the traditional ThreadLocal
* add License
* fix ci failed
* fix ci failed
* fix ci failed
* fix ci failed
* fix ci failed
* remove author info
* fix destroy method
* fix bug at method size.
---
.../dubbo/common/threadlocal/InternalThread.java | 73 ++++++++
.../common/threadlocal/InternalThreadLocal.java | 197 +++++++++++++++++++++
.../common/threadlocal/InternalThreadLocalMap.java | 168 ++++++++++++++++++
.../threadlocal/NamedInternalThreadFactory.java | 47 +++++
.../support/cached/CachedThreadPool.java | 5 +-
.../threadpool/support/eager/EagerThreadPool.java | 4 +-
.../threadpool/support/fixed/FixedThreadPool.java | 4 +-
.../support/limited/LimitedThreadPool.java | 4 +-
.../dubbo/common/utils/NamedThreadFactory.java | 11 +-
.../threadlocal/InternalThreadLocalTest.java | 173 ++++++++++++++++++
.../java/com/alibaba/dubbo/rpc/RpcContext.java | 7 +-
11 files changed, 678 insertions(+), 15 deletions(-)
diff --git
a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThread.java
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThread.java
new file mode 100644
index 0000000..a836561
--- /dev/null
+++
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThread.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.dubbo.common.threadlocal;
+
+/**
+ * InternalThread
+ */
+public class InternalThread extends Thread {
+
+ private InternalThreadLocalMap threadLocalMap;
+
+ public InternalThread() {
+ }
+
+ public InternalThread(Runnable target) {
+ super(target);
+ }
+
+ public InternalThread(ThreadGroup group, Runnable target) {
+ super(group, target);
+ }
+
+ public InternalThread(String name) {
+ super(name);
+ }
+
+ public InternalThread(ThreadGroup group, String name) {
+ super(group, name);
+ }
+
+ public InternalThread(Runnable target, String name) {
+ super(target, name);
+ }
+
+ public InternalThread(ThreadGroup group, Runnable target, String name) {
+ super(group, target, name);
+ }
+
+ public InternalThread(ThreadGroup group, Runnable target, String name,
long stackSize) {
+ super(group, target, name, stackSize);
+ }
+
+ /**
+ * Returns the internal data structure that keeps the threadLocal
variables bound to this thread.
+ * Note that this method is for internal use only, and thus is subject to
change at any time.
+ */
+ public final InternalThreadLocalMap threadLocalMap() {
+ return threadLocalMap;
+ }
+
+ /**
+ * Sets the internal data structure that keeps the threadLocal variables
bound to this thread.
+ * Note that this method is for internal use only, and thus is subject to
change at any time.
+ */
+ public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap)
{
+ this.threadLocalMap = threadLocalMap;
+ }
+}
diff --git
a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocal.java
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocal.java
new file mode 100644
index 0000000..4eed57c
--- /dev/null
+++
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocal.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.dubbo.common.threadlocal;
+
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Set;
+
+/**
+ * InternalThreadLocal
+ * A special variant of {@link ThreadLocal} that yields higher access
performance when accessed from a
+ * {@link InternalThread}.
+ * <p></p>
+ * Internally, a {@link InternalThread} uses a constant index in an array,
instead of using hash code and hash table,
+ * to look for a variable. Although seemingly very subtle, it yields slight
performance advantage over using a hash
+ * table, and it is useful when accessed frequently.
+ * <p></p>
+ * This design is learning from {@see
io.netty.util.concurrent.FastThreadLocal} which is in Netty.
+ */
+public class InternalThreadLocal<V> {
+
+ private static final int variablesToRemoveIndex =
InternalThreadLocalMap.nextVariableIndex();
+
+ private final int index;
+
+ public InternalThreadLocal() {
+ index = InternalThreadLocalMap.nextVariableIndex();
+ }
+
+ /**
+ * Removes all {@link InternalThreadLocal} variables bound to the current
thread. This operation is useful when you
+ * are in a container environment, and you don't want to leave the thread
local variables in the threads you do not
+ * manage.
+ */
+ @SuppressWarnings("unchecked")
+ public static void removeAll() {
+ InternalThreadLocalMap threadLocalMap =
InternalThreadLocalMap.getIfSet();
+ if (threadLocalMap == null) {
+ return;
+ }
+
+ try {
+ Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
+ if (v != null && v != InternalThreadLocalMap.UNSET) {
+ Set<InternalThreadLocal<?>> variablesToRemove =
(Set<InternalThreadLocal<?>>) v;
+ for (InternalThreadLocal<?> tlv : variablesToRemove) {
+ tlv.remove(threadLocalMap);
+ }
+ }
+ } finally {
+ InternalThreadLocalMap.remove();
+ }
+ }
+
+ /**
+ * Returns the number of thread local variables bound to the current
thread.
+ */
+ public static int size() {
+ InternalThreadLocalMap threadLocalMap =
InternalThreadLocalMap.getIfSet();
+ if (threadLocalMap == null) {
+ return 0;
+ } else {
+ return threadLocalMap.size();
+ }
+ }
+
+ public static void destroy() {
+ InternalThreadLocalMap.destroy();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void addToVariablesToRemove(InternalThreadLocalMap
threadLocalMap, InternalThreadLocal<?> variable) {
+ Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
+ Set<InternalThreadLocal<?>> variablesToRemove;
+ if (v == InternalThreadLocalMap.UNSET || v == null) {
+ variablesToRemove = Collections.newSetFromMap(new
IdentityHashMap<InternalThreadLocal<?>, Boolean>());
+ threadLocalMap.setIndexedVariable(variablesToRemoveIndex,
variablesToRemove);
+ } else {
+ variablesToRemove = (Set<InternalThreadLocal<?>>) v;
+ }
+
+ variablesToRemove.add(variable);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void removeFromVariablesToRemove(InternalThreadLocalMap
threadLocalMap, InternalThreadLocal<?> variable) {
+
+ Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
+
+ if (v == InternalThreadLocalMap.UNSET || v == null) {
+ return;
+ }
+
+ Set<InternalThreadLocal<?>> variablesToRemove =
(Set<InternalThreadLocal<?>>) v;
+ variablesToRemove.remove(variable);
+ }
+
+ /**
+ * Returns the current value for the current thread
+ */
+ @SuppressWarnings("unchecked")
+ public final V get() {
+ InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
+ Object v = threadLocalMap.indexedVariable(index);
+ if (v != InternalThreadLocalMap.UNSET) {
+ return (V) v;
+ }
+
+ return initialize(threadLocalMap);
+ }
+
+ private V initialize(InternalThreadLocalMap threadLocalMap) {
+ V v = null;
+ try {
+ v = initialValue();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ threadLocalMap.setIndexedVariable(index, v);
+ addToVariablesToRemove(threadLocalMap, this);
+ return v;
+ }
+
+ /**
+ * Sets the value for the current thread.
+ */
+ public final void set(V value) {
+ if (value == null || value == InternalThreadLocalMap.UNSET) {
+ remove();
+ } else {
+ InternalThreadLocalMap threadLocalMap =
InternalThreadLocalMap.get();
+ if (threadLocalMap.setIndexedVariable(index, value)) {
+ addToVariablesToRemove(threadLocalMap, this);
+ }
+ }
+ }
+
+ /**
+ * Sets the value to uninitialized; a proceeding call to get() will
trigger a call to initialValue().
+ */
+ @SuppressWarnings("unchecked")
+ public final void remove() {
+ remove(InternalThreadLocalMap.getIfSet());
+ }
+
+ /**
+ * Sets the value to uninitialized for the specified thread local map;
+ * a proceeding call to get() will trigger a call to initialValue().
+ * The specified thread local map must be for the current thread.
+ */
+ @SuppressWarnings("unchecked")
+ public final void remove(InternalThreadLocalMap threadLocalMap) {
+ if (threadLocalMap == null) {
+ return;
+ }
+
+ Object v = threadLocalMap.removeIndexedVariable(index);
+ removeFromVariablesToRemove(threadLocalMap, this);
+
+ if (v != InternalThreadLocalMap.UNSET) {
+ try {
+ onRemoval((V) v);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Returns the initial value for this thread-local variable.
+ */
+ protected V initialValue() throws Exception {
+ return null;
+ }
+
+ /**
+ * Invoked when this thread local variable is removed by {@link #remove()}.
+ */
+ protected void onRemoval(@SuppressWarnings("unused") V value) throws
Exception {
+ }
+}
diff --git
a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocalMap.java
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocalMap.java
new file mode 100644
index 0000000..e2aa0fd
--- /dev/null
+++
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocalMap.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.dubbo.common.threadlocal;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The internal data structure that stores the threadLocal variables for Netty
and all {@link InternalThread}s.
+ * Note that this class is for internal use only. Use {@link InternalThread}
+ * unless you know what you are doing.
+ */
+public final class InternalThreadLocalMap {
+
+ private Object[] indexedVariables;
+
+ private static ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap =
new ThreadLocal<InternalThreadLocalMap>();
+
+ private static final AtomicInteger nextIndex = new AtomicInteger();
+
+ public static final Object UNSET = new Object();
+
+ public static InternalThreadLocalMap getIfSet() {
+ Thread thread = Thread.currentThread();
+ if (thread instanceof InternalThread) {
+ return ((InternalThread) thread).threadLocalMap();
+ }
+ return slowThreadLocalMap.get();
+ }
+
+ public static InternalThreadLocalMap get() {
+ Thread thread = Thread.currentThread();
+ if (thread instanceof InternalThread) {
+ return fastGet((InternalThread) thread);
+ }
+ return slowGet();
+ }
+
+ public static void remove() {
+ Thread thread = Thread.currentThread();
+ if (thread instanceof InternalThread) {
+ ((InternalThread) thread).setThreadLocalMap(null);
+ } else {
+ slowThreadLocalMap.remove();
+ }
+ }
+
+ public static void destroy() {
+ slowThreadLocalMap = null;
+ }
+
+ public static int nextVariableIndex() {
+ int index = nextIndex.getAndIncrement();
+ if (index < 0) {
+ nextIndex.decrementAndGet();
+ throw new IllegalStateException("Too many thread-local indexed
variables");
+ }
+ return index;
+ }
+
+ public static int lastVariableIndex() {
+ return nextIndex.get() - 1;
+ }
+
+ private InternalThreadLocalMap() {
+ indexedVariables = newIndexedVariableTable();
+ }
+
+ public Object indexedVariable(int index) {
+ Object[] lookup = indexedVariables;
+ return index < lookup.length ? lookup[index] : UNSET;
+ }
+
+ /**
+ * @return {@code true} if and only if a new thread-local variable has
been created
+ */
+ public boolean setIndexedVariable(int index, Object value) {
+ Object[] lookup = indexedVariables;
+ if (index < lookup.length) {
+ Object oldValue = lookup[index];
+ lookup[index] = value;
+ return oldValue == UNSET;
+ } else {
+ expandIndexedVariableTableAndSet(index, value);
+ return true;
+ }
+ }
+
+ public Object removeIndexedVariable(int index) {
+ Object[] lookup = indexedVariables;
+ if (index < lookup.length) {
+ Object v = lookup[index];
+ lookup[index] = UNSET;
+ return v;
+ } else {
+ return UNSET;
+ }
+ }
+
+ public int size() {
+ int count = 0;
+ for (Object o : indexedVariables) {
+ if (o != UNSET) {
+ ++count;
+ }
+ }
+
+ //the fist element in `indexedVariables` is a set to keep all the
InternalThreadLocal to remove
+ //look at method `addToVariablesToRemove`
+ return count - 1;
+ }
+
+ private static Object[] newIndexedVariableTable() {
+ Object[] array = new Object[32];
+ Arrays.fill(array, UNSET);
+ return array;
+ }
+
+ private static InternalThreadLocalMap fastGet(InternalThread thread) {
+ InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
+ if (threadLocalMap == null) {
+ thread.setThreadLocalMap(threadLocalMap = new
InternalThreadLocalMap());
+ }
+ return threadLocalMap;
+ }
+
+ private static InternalThreadLocalMap slowGet() {
+ ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap =
InternalThreadLocalMap.slowThreadLocalMap;
+ InternalThreadLocalMap ret = slowThreadLocalMap.get();
+ if (ret == null) {
+ ret = new InternalThreadLocalMap();
+ slowThreadLocalMap.set(ret);
+ }
+ return ret;
+ }
+
+ private void expandIndexedVariableTableAndSet(int index, Object value) {
+ Object[] oldArray = indexedVariables;
+ final int oldCapacity = oldArray.length;
+ int newCapacity = index;
+ newCapacity |= newCapacity >>> 1;
+ newCapacity |= newCapacity >>> 2;
+ newCapacity |= newCapacity >>> 4;
+ newCapacity |= newCapacity >>> 8;
+ newCapacity |= newCapacity >>> 16;
+ newCapacity++;
+
+ Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
+ Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
+ newArray[index] = value;
+ indexedVariables = newArray;
+ }
+}
diff --git
a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/NamedInternalThreadFactory.java
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/NamedInternalThreadFactory.java
new file mode 100644
index 0000000..03868b9
--- /dev/null
+++
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/NamedInternalThreadFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.dubbo.common.threadlocal;
+
+import com.alibaba.dubbo.common.utils.NamedThreadFactory;
+
+/**
+ * NamedInternalThreadFactory
+ * This is a threadFactory which produce {@link InternalThread}
+ */
+public class NamedInternalThreadFactory extends NamedThreadFactory {
+
+ public NamedInternalThreadFactory() {
+ super();
+ }
+
+ public NamedInternalThreadFactory(String prefix) {
+ super(prefix, false);
+ }
+
+ public NamedInternalThreadFactory(String prefix, boolean daemon) {
+ super(prefix, daemon);
+ }
+
+ @Override
+ public Thread newThread(Runnable runnable) {
+ String name = mPrefix + mThreadNum.getAndIncrement();
+ InternalThread ret = new InternalThread(mGroup, runnable, name, 0);
+ ret.setDaemon(mDaemon);
+ return ret;
+ }
+}
diff --git
a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/cached/CachedThreadPool.java
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/cached/CachedThreadPool.java
index f808ab9..c5aa285 100644
---
a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/cached/CachedThreadPool.java
+++
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/cached/CachedThreadPool.java
@@ -18,9 +18,9 @@ package com.alibaba.dubbo.common.threadpool.support.cached;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
+import com.alibaba.dubbo.common.threadlocal.NamedInternalThreadFactory;
import com.alibaba.dubbo.common.threadpool.ThreadPool;
import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport;
-import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
@@ -47,7 +47,6 @@ public class CachedThreadPool implements ThreadPool {
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
- new NamedThreadFactory(name, true), new
AbortPolicyWithReport(name, url));
+ new NamedInternalThreadFactory(name, true), new
AbortPolicyWithReport(name, url));
}
-
}
diff --git
a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPool.java
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPool.java
index eb4e1f3..2f97b6b 100644
---
a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPool.java
+++
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPool.java
@@ -19,9 +19,9 @@ package com.alibaba.dubbo.common.threadpool.support.eager;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
+import com.alibaba.dubbo.common.threadlocal.NamedInternalThreadFactory;
import com.alibaba.dubbo.common.threadpool.ThreadPool;
import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport;
-import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -48,7 +48,7 @@ public class EagerThreadPool implements ThreadPool {
alive,
TimeUnit.MILLISECONDS,
taskQueue,
- new NamedThreadFactory(name, true),
+ new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
diff --git
a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/fixed/FixedThreadPool.java
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/fixed/FixedThreadPool.java
index 15ebb20..d3b61a5 100644
---
a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/fixed/FixedThreadPool.java
+++
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/fixed/FixedThreadPool.java
@@ -18,9 +18,9 @@ package com.alibaba.dubbo.common.threadpool.support.fixed;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
+import com.alibaba.dubbo.common.threadlocal.NamedInternalThreadFactory;
import com.alibaba.dubbo.common.threadpool.ThreadPool;
import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport;
-import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
@@ -44,7 +44,7 @@ public class FixedThreadPool implements ThreadPool {
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
- new NamedThreadFactory(name, true), new
AbortPolicyWithReport(name, url));
+ new NamedInternalThreadFactory(name, true), new
AbortPolicyWithReport(name, url));
}
}
diff --git
a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
index aa5a71f..b50bc9e 100644
---
a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
+++
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
@@ -19,9 +19,9 @@ package com.alibaba.dubbo.common.threadpool.support.limited;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
+import com.alibaba.dubbo.common.threadlocal.NamedInternalThreadFactory;
import com.alibaba.dubbo.common.threadpool.ThreadPool;
import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport;
-import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
@@ -45,7 +45,7 @@ public class LimitedThreadPool implements ThreadPool {
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
- new NamedThreadFactory(name, true), new
AbortPolicyWithReport(name, url));
+ new NamedInternalThreadFactory(name, true), new
AbortPolicyWithReport(name, url));
}
}
diff --git
a/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NamedThreadFactory.java
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NamedThreadFactory.java
index 3d30a03..36ace7d 100755
---
a/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NamedThreadFactory.java
+++
b/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NamedThreadFactory.java
@@ -23,15 +23,16 @@ import java.util.concurrent.atomic.AtomicInteger;
* InternalThreadFactory.
*/
public class NamedThreadFactory implements ThreadFactory {
- private static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
- private final AtomicInteger mThreadNum = new AtomicInteger(1);
+ protected static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
- private final String mPrefix;
+ protected final AtomicInteger mThreadNum = new AtomicInteger(1);
- private final boolean mDaemon;
+ protected final String mPrefix;
- private final ThreadGroup mGroup;
+ protected final boolean mDaemon;
+
+ protected final ThreadGroup mGroup;
public NamedThreadFactory() {
this("pool-" + POOL_SEQ.getAndIncrement(), false);
diff --git
a/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocalTest.java
b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocalTest.java
new file mode 100644
index 0000000..b0a8c28
--- /dev/null
+++
b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocalTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.dubbo.common.threadlocal;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
+
+public class InternalThreadLocalTest {
+
+ private static final int THREADS = 10;
+
+ private static final int PERFORMANCE_THREAD_COUNT = 1000;
+
+ private static final int GET_COUNT = 1000000;
+
+ @Test
+ public void testInternalThreadLocal() throws InterruptedException {
+ final AtomicInteger index = new AtomicInteger(0);
+
+ final InternalThreadLocal<Integer> internalThreadLocal = new
InternalThreadLocal<Integer>() {
+
+ @Override
+ protected Integer initialValue() throws Exception {
+ Integer v = index.getAndIncrement();
+ System.out.println("thread : " +
Thread.currentThread().getName() + " init value : " + v);
+ return v;
+ }
+ };
+
+ for (int i = 0; i < THREADS; i++) {
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ internalThreadLocal.get();
+ }
+ });
+ t.start();
+ }
+
+ Thread.sleep(2000);
+ }
+
+ @Test
+ public void testSetAndGet() {
+ final Integer testVal = 10;
+ final InternalThreadLocal<Integer> internalThreadLocal = new
InternalThreadLocal<Integer>();
+ internalThreadLocal.set(testVal);
+ Assert.assertTrue("set is not equals get",
+ Objects.equals(testVal, internalThreadLocal.get()));
+ }
+
+ @Test
+ public void testMultiThreadSetAndGet() throws InterruptedException {
+ final Integer testVal1 = 10;
+ final Integer testVal2 = 20;
+ final InternalThreadLocal<Integer> internalThreadLocal = new
InternalThreadLocal<Integer>();
+ final CountDownLatch countDownLatch = new CountDownLatch(2);
+ Thread t1 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+
+ internalThreadLocal.set(testVal1);
+ Assert.assertTrue("set is not equals get",
+ Objects.equals(testVal1, internalThreadLocal.get()));
+ countDownLatch.countDown();
+ }
+ });
+ t1.start();
+
+ Thread t2 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ internalThreadLocal.set(testVal2);
+ Assert.assertTrue("set is not equals get",
+ Objects.equals(testVal2, internalThreadLocal.get()));
+ countDownLatch.countDown();
+ }
+ });
+ t2.start();
+ countDownLatch.await();
+ }
+
+ /**
+ * print
+ * take[2689]ms
+ * <p></p>
+ * This test is based on a Machine with 4 core and 16g memory.
+ */
+ @Test
+ public void testPerformanceTradition() {
+ final ThreadLocal<String>[] caches1 = new
ThreadLocal[PERFORMANCE_THREAD_COUNT];
+ final Thread mainThread = Thread.currentThread();
+ for (int i = 0; i < PERFORMANCE_THREAD_COUNT; i++) {
+ caches1[i] = new ThreadLocal<String>();
+ }
+ Thread t1 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 0; i < PERFORMANCE_THREAD_COUNT; i++) {
+ caches1[i].set("float.lu");
+ }
+ long start = System.nanoTime();
+ for (int i = 0; i < PERFORMANCE_THREAD_COUNT; i++) {
+ for (int j = 0; j < GET_COUNT; j++) {
+ caches1[i].get();
+ }
+ }
+ long end = System.nanoTime();
+ System.out.println("take[" + TimeUnit.NANOSECONDS.toMillis(end
- start) +
+ "]ms");
+ LockSupport.unpark(mainThread);
+ }
+ });
+ t1.start();
+ LockSupport.park(mainThread);
+ }
+
+ /**
+ * print
+ * take[14]ms
+ * <p></p>
+ * This test is based on a Machine with 4 core and 16g memory.
+ */
+ @Test
+ public void testPerformance() {
+ final InternalThreadLocal<String>[] caches = new
InternalThreadLocal[PERFORMANCE_THREAD_COUNT];
+ final Thread mainThread = Thread.currentThread();
+ for (int i = 0; i < PERFORMANCE_THREAD_COUNT; i++) {
+ caches[i] = new InternalThreadLocal<String>();
+ }
+ Thread t = new InternalThread(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 0; i < PERFORMANCE_THREAD_COUNT; i++) {
+ caches[i].set("float.lu");
+ }
+ long start = System.nanoTime();
+ for (int i = 0; i < PERFORMANCE_THREAD_COUNT; i++) {
+ for (int j = 0; j < GET_COUNT; j++) {
+ caches[i].get();
+ }
+ }
+ long end = System.nanoTime();
+ System.out.println("take[" + TimeUnit.NANOSECONDS.toMillis(end
- start) +
+ "]ms");
+ LockSupport.unpark(mainThread);
+ }
+ });
+ t.start();
+ LockSupport.park(mainThread);
+ }
+}
\ No newline at end of file
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java
index a542cd6..bf56e2d 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java
@@ -18,6 +18,7 @@ package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
+import com.alibaba.dubbo.common.threadlocal.InternalThreadLocal;
import com.alibaba.dubbo.common.utils.NetUtils;
import java.net.InetSocketAddress;
@@ -45,12 +46,16 @@ import java.util.concurrent.TimeoutException;
*/
public class RpcContext {
- private static final ThreadLocal<RpcContext> LOCAL = new
ThreadLocal<RpcContext>() {
+ /**
+ * use internal thread local to improve performance
+ */
+ private static final InternalThreadLocal<RpcContext> LOCAL = new
InternalThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue() {
return new RpcContext();
}
};
+
private final Map<String, String> attachments = new HashMap<String,
String>();
private final Map<String, Object> values = new HashMap<String, Object>();
private Future<?> future;
--
To stop receiving notification emails like this one, please contact
[email protected].