This is an automated email from the ASF dual-hosted git repository.
oxsean pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 51f4f7454e Performance tuning (#14604)
51f4f7454e is described below
commit 51f4f7454e70c2acd31355763a5186f9d6f257f9
Author: Sean Yang <[email protected]>
AuthorDate: Mon Sep 2 18:29:10 2024 +0800
Performance tuning (#14604)
Co-authored-by: Albumen Kevin <[email protected]>
---
.editorconfig | 2 +
.../apache/dubbo/common/BaseServiceMetadata.java | 2 +-
.../common/beans/factory/ScopeBeanFactory.java | 46 ++++--
.../support/FailsafeErrorTypeAwareLogger.java | 12 ++
.../common/logger/support/FailsafeLogger.java | 90 +++++------
.../org/apache/dubbo/common/utils/UrlUtils.java | 7 +
.../rpc/model/ReflectionMethodDescriptor.java | 6 +
.../dubbo/rpc/model/StubMethodDescriptor.java | 7 +
.../support/FailsafeErrorTypeAwareLoggerTest.java | 3 +
.../common/logger/support/FailsafeLoggerTest.java | 7 +
.../protocol/tri/servlet/ServletStreamChannel.java | 53 ++++++-
.../rpc/protocol/tri/servlet/TripleFilter.java | 4 +-
.../remoting/api/AbstractHttpProtocolDetector.java | 168 ---------------------
.../http12/AbstractServerHttpChannelObserver.java | 2 +-
.../dubbo/remoting/http12/HttpChannelObserver.java | 4 +-
.../remoting/http12/HttpTransportListener.java | 3 +-
.../http12/h2/Http2ServerChannelObserver.java | 2 +-
.../remoting/http12/h2/Http2TransportListener.java | 5 +-
.../netty4/h1/NettyHttp1ConnectionHandler.java | 1 -
.../h2/NettyHttp2ProtocolSelectorHandler.java | 28 ++--
.../tri/h12/AbstractServerTransportListener.java | 38 ++++-
.../protocol/tri/h12/grpc/GrpcCompositeCodec.java | 19 ++-
.../DefaultHttp11ServerTransportListener.java | 2 +-
.../http2/GenericHttp2ServerTransportListener.java | 2 +-
.../dubbo/rpc/protocol/tri/rest/RestConstants.java | 1 -
.../rest/filter/RestExtensionExecutionFilter.java | 10 +-
.../tri/rest/support/basic/RestProtocolTest.groovy | 15 ++
.../rpc/protocol/tri/rest/service/DemoService.java | 3 +
.../protocol/tri/rest/service/DemoServiceImpl.java | 5 +
29 files changed, 274 insertions(+), 273 deletions(-)
diff --git a/.editorconfig b/.editorconfig
index 5afaa5666b..b9e9081f75 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -58,6 +58,8 @@ ij_java_extends_list_wrap = normal
ij_java_extends_keyword_wrap = normal
ij_java_binary_operation_wrap = normal
ij_java_binary_operation_sign_on_next_line = true
+ij_java_generate_final_locals = false
+ij_java_generate_final_parameters = false
[*.groovy]
max_line_length = 180
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/BaseServiceMetadata.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/BaseServiceMetadata.java
index 58d0ac3796..17050e0e7c 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/BaseServiceMetadata.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/BaseServiceMetadata.java
@@ -46,7 +46,7 @@ public class BaseServiceMetadata {
if (StringUtils.isNotEmpty(version)) {
buf.append(':').append(version);
}
- return buf.toString().intern();
+ return buf.toString();
}
public static String versionFromServiceKey(String serviceKey) {
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/beans/factory/ScopeBeanFactory.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/beans/factory/ScopeBeanFactory.java
index e57b501316..b02d4c97fe 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/beans/factory/ScopeBeanFactory.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/beans/factory/ScopeBeanFactory.java
@@ -24,15 +24,17 @@ import
org.apache.dubbo.common.extension.ExtensionPostProcessor;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.resource.Disposable;
-import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
+import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.Pair;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.model.ScopeModelAccessor;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -44,18 +46,19 @@ import static
org.apache.dubbo.common.constants.LoggerCodeConstants.CONFIG_FAILE
/**
* A bean factory for internal sharing.
*/
-public class ScopeBeanFactory {
+public final class ScopeBeanFactory {
- protected static final ErrorTypeAwareLogger LOGGER =
LoggerFactory.getErrorTypeAwareLogger(ScopeBeanFactory.class);
+ private static final ErrorTypeAwareLogger LOGGER =
LoggerFactory.getErrorTypeAwareLogger(ScopeBeanFactory.class);
private final ScopeBeanFactory parent;
private final ExtensionAccessor extensionAccessor;
private final List<ExtensionPostProcessor> extensionPostProcessors;
- private final ConcurrentHashMap<Class<?>, AtomicInteger>
beanNameIdCounterMap = new ConcurrentHashMap<>();
+ private final Map<Class<?>, AtomicInteger> beanNameIdCounterMap =
CollectionUtils.newConcurrentHashMap();
private final List<BeanInfo> registeredBeanInfos = new
CopyOnWriteArrayList<>();
private InstantiationStrategy instantiationStrategy;
private final AtomicBoolean destroyed = new AtomicBoolean();
private final Set<Class<?>> registeredClasses = new ConcurrentHashSet<>();
+ private final Map<Pair<Class<?>, String>, Optional<Object>> beanCache =
CollectionUtils.newConcurrentHashMap();
public ScopeBeanFactory(ScopeBeanFactory parent, ExtensionAccessor
extensionAccessor) {
this.parent = parent;
@@ -77,7 +80,7 @@ public class ScopeBeanFactory {
}
public <T> T registerBean(Class<T> bean) throws ScopeBeanException {
- return this.getOrRegisterBean(null, bean);
+ return getOrRegisterBean(null, bean);
}
public <T> T registerBean(String name, Class<T> clazz) throws
ScopeBeanException {
@@ -101,7 +104,7 @@ public class ScopeBeanFactory {
}
public void registerBean(Object bean) {
- this.registerBean(null, bean);
+ registerBean(null, bean);
}
public void registerBean(String name, Object bean) {
@@ -118,12 +121,14 @@ public class ScopeBeanFactory {
initializeBean(name, bean);
registeredBeanInfos.add(new BeanInfo(name, bean));
+ beanCache.entrySet().removeIf(e ->
e.getKey().getLeft().isAssignableFrom(beanClass));
}
public <T> T getOrRegisterBean(Class<T> type) {
return getOrRegisterBean(null, type);
}
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public <T> T getOrRegisterBean(String name, Class<T> type) {
T bean = getBean(name, type);
if (bean == null) {
@@ -143,6 +148,7 @@ public class ScopeBeanFactory {
return getOrRegisterBean(null, type, mappingFunction);
}
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public <T> T getOrRegisterBean(
String name, Class<T> type, Function<? super Class<T>, ? extends
T> mappingFunction) {
T bean = getBean(name, type);
@@ -186,7 +192,8 @@ public class ScopeBeanFactory {
}
private int getNextId(Class<?> beanClass) {
- return ConcurrentHashMapUtils.computeIfAbsent(beanNameIdCounterMap,
beanClass, key -> new AtomicInteger())
+ return beanNameIdCounterMap
+ .computeIfAbsent(beanClass, key -> new AtomicInteger())
.incrementAndGet();
}
@@ -203,20 +210,36 @@ public class ScopeBeanFactory {
}
public <T> T getBean(Class<T> type) {
- return this.getBean(null, type);
+ return getBean(null, type);
}
public <T> T getBean(String name, Class<T> type) {
- T bean = getBeanInternal(name, type);
+ T bean = getBeanFromCache(name, type);
if (bean == null && parent != null) {
return parent.getBean(name, type);
}
return bean;
}
+ @SuppressWarnings("unchecked")
+ private <T> T getBeanFromCache(String name, Class<T> type) {
+ Object value = beanCache
+ .computeIfAbsent(Pair.of(type, name), k -> {
+ try {
+ return Optional.ofNullable(getBeanInternal(name,
type));
+ } catch (ScopeBeanException e) {
+ return Optional.of(e);
+ }
+ })
+ .orElse(null);
+ if (value instanceof ScopeBeanException) {
+ throw (ScopeBeanException) value;
+ }
+ return (T) value;
+ }
+
@SuppressWarnings("unchecked")
private <T> T getBeanInternal(String name, Class<T> type) {
- checkDestroyed();
// All classes are derived from java.lang.Object, cannot filter bean
by it
if (type == Object.class) {
return null;
@@ -278,6 +301,7 @@ public class ScopeBeanFactory {
}
}
registeredBeanInfos.clear();
+ beanCache.clear();
}
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/logger/support/FailsafeErrorTypeAwareLogger.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/logger/support/FailsafeErrorTypeAwareLogger.java
index c9d7d2b303..d30668c4a3 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/logger/support/FailsafeErrorTypeAwareLogger.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/logger/support/FailsafeErrorTypeAwareLogger.java
@@ -98,6 +98,9 @@ public class FailsafeErrorTypeAwareLogger extends
FailsafeLogger implements List
try {
onEvent(code, msg);
+ if (!getLogger().isWarnEnabled()) {
+ return;
+ }
getLogger().warn(appendContextMessageWithInstructions(code, cause,
extendedInformation, msg));
} catch (Throwable t) {
// ignored.
@@ -112,6 +115,9 @@ public class FailsafeErrorTypeAwareLogger extends
FailsafeLogger implements List
try {
onEvent(code, msg);
+ if (!getLogger().isWarnEnabled()) {
+ return;
+ }
getLogger().warn(appendContextMessageWithInstructions(code, cause,
extendedInformation, msg), e);
} catch (Throwable t) {
// ignored.
@@ -126,6 +132,9 @@ public class FailsafeErrorTypeAwareLogger extends
FailsafeLogger implements List
try {
onEvent(code, msg);
+ if (!getLogger().isErrorEnabled()) {
+ return;
+ }
getLogger().error(appendContextMessageWithInstructions(code,
cause, extendedInformation, msg));
} catch (Throwable t) {
// ignored.
@@ -140,6 +149,9 @@ public class FailsafeErrorTypeAwareLogger extends
FailsafeLogger implements List
try {
onEvent(code, msg);
+ if (!getLogger().isErrorEnabled()) {
+ return;
+ }
getLogger().error(appendContextMessageWithInstructions(code,
cause, extendedInformation, msg), e);
} catch (Throwable t) {
// ignored.
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/logger/support/FailsafeLogger.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/logger/support/FailsafeLogger.java
index d329fb2565..a9c07e9c1f 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/logger/support/FailsafeLogger.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/logger/support/FailsafeLogger.java
@@ -53,221 +53,221 @@ public class FailsafeLogger implements Logger {
@Override
public void trace(String msg, Throwable e) {
- if (disabled) {
+ if (disabled || !logger.isTraceEnabled()) {
return;
}
try {
logger.trace(appendContextMessage(msg), e);
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void trace(Throwable e) {
- if (disabled) {
+ if (disabled || !logger.isTraceEnabled()) {
return;
}
try {
logger.trace(e);
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void trace(String msg) {
- if (disabled) {
+ if (disabled || !logger.isTraceEnabled()) {
return;
}
try {
logger.trace(appendContextMessage(msg));
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void trace(String msg, Object... arguments) {
- if (disabled) {
+ if (disabled || !logger.isTraceEnabled()) {
return;
}
try {
logger.trace(appendContextMessage(msg), arguments);
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void debug(String msg, Throwable e) {
- if (disabled) {
+ if (disabled || !logger.isDebugEnabled()) {
return;
}
try {
logger.debug(appendContextMessage(msg), e);
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void debug(Throwable e) {
- if (disabled) {
+ if (disabled || !logger.isDebugEnabled()) {
return;
}
try {
logger.debug(e);
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void debug(String msg) {
- if (disabled) {
+ if (disabled || !logger.isDebugEnabled()) {
return;
}
try {
logger.debug(appendContextMessage(msg));
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void debug(String msg, Object... arguments) {
- if (disabled) {
+ if (disabled || !logger.isDebugEnabled()) {
return;
}
try {
logger.debug(appendContextMessage(msg), arguments);
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void info(String msg, Throwable e) {
- if (disabled) {
+ if (disabled || !logger.isInfoEnabled()) {
return;
}
try {
logger.info(appendContextMessage(msg), e);
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void info(String msg) {
- if (disabled) {
+ if (disabled || !logger.isInfoEnabled()) {
return;
}
try {
logger.info(appendContextMessage(msg));
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void info(String msg, Object... arguments) {
- if (disabled) {
+ if (disabled || !logger.isInfoEnabled()) {
return;
}
try {
logger.info(appendContextMessage(msg), arguments);
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void warn(String msg, Throwable e) {
- if (disabled) {
+ if (disabled || !logger.isWarnEnabled()) {
return;
}
try {
logger.warn(appendContextMessage(msg), e);
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void warn(String msg) {
- if (disabled) {
+ if (disabled || !logger.isWarnEnabled()) {
return;
}
try {
logger.warn(appendContextMessage(msg));
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void warn(String msg, Object... arguments) {
- if (disabled) {
+ if (disabled || !logger.isWarnEnabled()) {
return;
}
try {
logger.warn(appendContextMessage(msg), arguments);
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void error(String msg, Throwable e) {
- if (disabled) {
+ if (disabled || !logger.isErrorEnabled()) {
return;
}
try {
logger.error(appendContextMessage(msg), e);
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void error(String msg) {
- if (disabled) {
+ if (disabled || !logger.isErrorEnabled()) {
return;
}
try {
logger.error(appendContextMessage(msg));
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void error(String msg, Object... arguments) {
- if (disabled) {
+ if (disabled || !logger.isErrorEnabled()) {
return;
}
try {
logger.error(appendContextMessage(msg), arguments);
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void error(Throwable e) {
- if (disabled) {
+ if (disabled || !logger.isErrorEnabled()) {
return;
}
try {
logger.error(e);
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void info(Throwable e) {
- if (disabled) {
+ if (disabled || !logger.isInfoEnabled()) {
return;
}
try {
logger.info(e);
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@Override
public void warn(Throwable e) {
- if (disabled) {
+ if (disabled || !logger.isWarnEnabled()) {
return;
}
try {
logger.warn(e);
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
}
}
@@ -278,7 +278,7 @@ public class FailsafeLogger implements Logger {
}
try {
return logger.isTraceEnabled();
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
return false;
}
}
@@ -290,7 +290,7 @@ public class FailsafeLogger implements Logger {
}
try {
return logger.isDebugEnabled();
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
return false;
}
}
@@ -302,7 +302,7 @@ public class FailsafeLogger implements Logger {
}
try {
return logger.isInfoEnabled();
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
return false;
}
}
@@ -314,7 +314,7 @@ public class FailsafeLogger implements Logger {
}
try {
return logger.isWarnEnabled();
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
return false;
}
}
@@ -326,7 +326,7 @@ public class FailsafeLogger implements Logger {
}
try {
return logger.isErrorEnabled();
- } catch (Throwable t) {
+ } catch (Throwable ignored) {
return false;
}
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java
index 89e6b52b8d..ff7c88394e 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -686,4 +687,10 @@ public class UrlUtils {
public static boolean isConsumer(URL url) {
return url.getProtocol().equalsIgnoreCase(CONSUMER) || url.getPort()
== 0;
}
+
+ @SuppressWarnings("unchecked")
+ public static <T> T computeServiceAttribute(URL url, String key,
Function<URL, T> fn) {
+ return (T)
+
url.getServiceModel().getServiceMetadata().getAttributeMap().computeIfAbsent(key,
k -> fn.apply(url));
+ }
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ReflectionMethodDescriptor.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ReflectionMethodDescriptor.java
index ea27aad53c..6bb9ddcb33 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ReflectionMethodDescriptor.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ReflectionMethodDescriptor.java
@@ -32,6 +32,7 @@ import java.util.stream.Stream;
import static org.apache.dubbo.common.constants.CommonConstants.$INVOKE;
import static org.apache.dubbo.common.constants.CommonConstants.$INVOKE_ASYNC;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_REFLECTIVE_OPERATION_FAILED;
+import static org.apache.dubbo.common.utils.MethodUtils.toShortString;
public class ReflectionMethodDescriptor implements MethodDescriptor {
private static final ErrorTypeAwareLogger logger =
@@ -183,4 +184,9 @@ public class ReflectionMethodDescriptor implements
MethodDescriptor {
result = 31 * result + Arrays.hashCode(returnTypes);
return result;
}
+
+ @Override
+ public String toString() {
+ return "ReflectionMethodDescriptor{method='" + toShortString(method) +
"', rpcType=" + rpcType + '}';
+ }
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/StubMethodDescriptor.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/StubMethodDescriptor.java
index 56793dd6f9..784c93e12b 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/StubMethodDescriptor.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/StubMethodDescriptor.java
@@ -138,4 +138,11 @@ public class StubMethodDescriptor implements
MethodDescriptor, PackableMethod {
public UnPack getRequestUnpack() {
return requestUnpack;
}
+
+ @Override
+ public String toString() {
+ return "StubMethodDescriptor{" + "method=" + methodName + '('
+ + (parameterClasses.length > 0 ?
parameterClasses[0].getSimpleName() : "") + "), rpcType='" + rpcType
+ + "'}";
+ }
}
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/common/logger/support/FailsafeErrorTypeAwareLoggerTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/common/logger/support/FailsafeErrorTypeAwareLoggerTest.java
index 898d4c44c4..be230996a9 100644
---
a/dubbo-common/src/test/java/org/apache/dubbo/common/logger/support/FailsafeErrorTypeAwareLoggerTest.java
+++
b/dubbo-common/src/test/java/org/apache/dubbo/common/logger/support/FailsafeErrorTypeAwareLoggerTest.java
@@ -30,6 +30,7 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Tests for FailsafeErrorTypeAwareLogger to test whether it 'ignores'
exceptions thrown by logger or not.
@@ -64,6 +65,8 @@ class FailsafeErrorTypeAwareLoggerTest {
@Test
void testSuccessLogger() {
Logger successLogger = mock(Logger.class);
+ when(successLogger.isErrorEnabled()).thenReturn(true);
+ when(successLogger.isWarnEnabled()).thenReturn(true);
FailsafeErrorTypeAwareLogger failsafeLogger = new
FailsafeErrorTypeAwareLogger(successLogger);
failsafeLogger.error(REGISTRY_ADDRESS_INVALID, "Registry center", "May
be it's offline.", "error");
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/common/logger/support/FailsafeLoggerTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/common/logger/support/FailsafeLoggerTest.java
index df8afe0394..65de0c9ce1 100644
---
a/dubbo-common/src/test/java/org/apache/dubbo/common/logger/support/FailsafeLoggerTest.java
+++
b/dubbo-common/src/test/java/org/apache/dubbo/common/logger/support/FailsafeLoggerTest.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.logger.Logger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
@@ -67,6 +68,12 @@ class FailsafeLoggerTest {
@Test
void testSuccessLogger() {
Logger successLogger = mock(Logger.class);
+ Mockito.when(successLogger.isErrorEnabled()).thenReturn(true);
+ Mockito.when(successLogger.isWarnEnabled()).thenReturn(true);
+ Mockito.when(successLogger.isInfoEnabled()).thenReturn(true);
+ Mockito.when(successLogger.isDebugEnabled()).thenReturn(true);
+ Mockito.when(successLogger.isTraceEnabled()).thenReturn(true);
+
FailsafeLogger failsafeLogger = new FailsafeLogger(successLogger);
failsafeLogger.error("error");
failsafeLogger.warn("warn");
diff --git
a/dubbo-plugin/dubbo-triple-servlet/src/main/java/org/apache/dubbo/rpc/protocol/tri/servlet/ServletStreamChannel.java
b/dubbo-plugin/dubbo-triple-servlet/src/main/java/org/apache/dubbo/rpc/protocol/tri/servlet/ServletStreamChannel.java
index 934d18c118..af1e12a906 100644
---
a/dubbo-plugin/dubbo-triple-servlet/src/main/java/org/apache/dubbo/rpc/protocol/tri/servlet/ServletStreamChannel.java
+++
b/dubbo-plugin/dubbo-triple-servlet/src/main/java/org/apache/dubbo/rpc/protocol/tri/servlet/ServletStreamChannel.java
@@ -43,12 +43,17 @@ import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
final class ServletStreamChannel implements H2StreamChannel {
private static final Logger LOGGER =
LoggerFactory.getLogger(ServletStreamChannel.class);
+ private final Queue<Object> writeQueue = new ConcurrentLinkedQueue<>();
+ private final AtomicBoolean writeable = new AtomicBoolean();
private final HttpServletRequest request;
private final HttpServletResponse response;
private final AsyncContext context;
@@ -93,6 +98,28 @@ final class ServletStreamChannel implements H2StreamChannel {
}
}
+ public void onWritePossible() {
+ if (writeable.compareAndSet(false, true)) {
+ flushQueue();
+ }
+ }
+
+ private void flushQueue() {
+ if (writeQueue.isEmpty()) {
+ return;
+ }
+ synchronized (writeQueue) {
+ Object obj;
+ while ((obj = writeQueue.poll()) != null) {
+ if (obj instanceof HttpMetadata) {
+ writeHeaderInternal((HttpMetadata) obj);
+ } else if (obj instanceof HttpOutputMessage) {
+ writeMessageInternal((HttpOutputMessage) obj);
+ }
+ }
+ }
+ }
+
@Override
public CompletableFuture<Void> writeResetFrame(long errorCode) {
if (isGrpc) {
@@ -129,6 +156,16 @@ final class ServletStreamChannel implements
H2StreamChannel {
@Override
public CompletableFuture<Void> writeHeader(HttpMetadata httpMetadata) {
+ if (writeable.get()) {
+ flushQueue();
+ writeHeaderInternal(httpMetadata);
+ } else {
+ writeQueue.add(httpMetadata);
+ }
+ return completed();
+ }
+
+ private void writeHeaderInternal(HttpMetadata httpMetadata) {
boolean endStream = false;
boolean isHttp1 = true;
if (httpMetadata instanceof Http2Header) {
@@ -145,11 +182,11 @@ final class ServletStreamChannel implements
H2StreamChannel {
}
return map;
});
- return completed();
+ return;
}
if (response.isCommitted()) {
- return completed();
+ return;
}
for (Entry<CharSequence, String> entry : headers) {
@@ -173,11 +210,20 @@ final class ServletStreamChannel implements
H2StreamChannel {
context.complete();
}
}
- return completed();
}
@Override
public CompletableFuture<Void> writeMessage(HttpOutputMessage
httpOutputMessage) {
+ if (writeable.get()) {
+ flushQueue();
+ writeMessageInternal(httpOutputMessage);
+ } else {
+ writeQueue.add(httpOutputMessage);
+ }
+ return completed();
+ }
+
+ private void writeMessageInternal(HttpOutputMessage httpOutputMessage) {
boolean endStream = false;
if (httpOutputMessage instanceof Http2OutputMessage) {
endStream = ((Http2OutputMessage) httpOutputMessage).isEndStream();
@@ -196,7 +242,6 @@ final class ServletStreamChannel implements H2StreamChannel
{
context.complete();
}
}
- return completed();
}
@Override
diff --git
a/dubbo-plugin/dubbo-triple-servlet/src/main/java/org/apache/dubbo/rpc/protocol/tri/servlet/TripleFilter.java
b/dubbo-plugin/dubbo-triple-servlet/src/main/java/org/apache/dubbo/rpc/protocol/tri/servlet/TripleFilter.java
index c07ca8d507..85175d3d24 100644
---
a/dubbo-plugin/dubbo-triple-servlet/src/main/java/org/apache/dubbo/rpc/protocol/tri/servlet/TripleFilter.java
+++
b/dubbo-plugin/dubbo-triple-servlet/src/main/java/org/apache/dubbo/rpc/protocol/tri/servlet/TripleFilter.java
@@ -256,7 +256,9 @@ public class TripleFilter implements Filter {
}
@Override
- public void onWritePossible() {}
+ public void onWritePossible() {
+ channel.onWritePossible();
+ }
@Override
public void onError(Throwable t) {
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractHttpProtocolDetector.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractHttpProtocolDetector.java
deleted file mode 100644
index 6cd013f81d..0000000000
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractHttpProtocolDetector.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.remoting.api;
-
-import org.apache.dubbo.remoting.buffer.ChannelBuffer;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static
org.apache.dubbo.remoting.api.AbstractHttpProtocolDetector.HttpMethod.DELETE;
-import static
org.apache.dubbo.remoting.api.AbstractHttpProtocolDetector.HttpMethod.GET;
-import static
org.apache.dubbo.remoting.api.AbstractHttpProtocolDetector.HttpMethod.HEAD;
-import static
org.apache.dubbo.remoting.api.AbstractHttpProtocolDetector.HttpMethod.OPTIONS;
-import static
org.apache.dubbo.remoting.api.AbstractHttpProtocolDetector.HttpMethod.PATCH;
-import static
org.apache.dubbo.remoting.api.AbstractHttpProtocolDetector.HttpMethod.POST;
-import static
org.apache.dubbo.remoting.api.AbstractHttpProtocolDetector.HttpMethod.PUT;
-import static
org.apache.dubbo.remoting.api.AbstractHttpProtocolDetector.HttpMethod.TRACE;
-
-/**
- * http protocol detector
- */
-public abstract class AbstractHttpProtocolDetector implements ProtocolDetector
{
-
- protected int empty = ' ';
- protected static String SIMPLE_HTTP = "XXX HTTP/1";
-
- protected static final List<HttpMethod> QOS_HTTP_METHOD =
Arrays.asList(GET, POST);
-
- /**
- * rank by frequency
- * first GET ,POST,DELETE,PUT
- * second HEAD,PATCH,OPTIONS,TRACE
- */
- protected static final List<HttpMethod> HTTP_METHODS =
- Arrays.asList(GET, POST, DELETE, PUT, HEAD, PATCH, OPTIONS, TRACE);
-
- protected static char[][] getHttpMethodsPrefix(int length,
List<HttpMethod> httpMethods) {
- if (0 >= length || length > 3) {
- throw new IllegalArgumentException("Current substring length is
beyond Http methods length");
- }
-
- List<char[]> prefix = new ArrayList<>();
- for (HttpMethod httpMethod : httpMethods) {
- prefix.add(httpMethod.getValue().substring(0,
length).toCharArray());
- }
-
- return prefix.toArray(new char[0][]);
- }
-
- protected static char[][] getHttpMethodsPrefix() {
- return getHttpMethodsPrefix(3, HTTP_METHODS);
- }
-
- protected static char[][] getQOSHttpMethodsPrefix() {
- return getHttpMethodsPrefix(3, QOS_HTTP_METHOD);
- }
-
- /**
- * qos /name/appName
- *
- * @param requestUrl
- * @return
- */
- protected boolean isQosRequestURL(String requestUrl) {
-
- if (requestUrl == null) {
- return false;
- }
-
- String[] split = requestUrl.split("/");
-
- if (split.length <= 3) {
- return true;
- }
-
- return false;
- }
-
- protected String splitAndGetFirst(String str) {
-
- return splitAndGet(str, 1);
- }
-
- protected String splitAndGet(String str, int index) {
- if (str == null) {
- return null;
- }
-
- String[] split = str.split("/");
-
- if (split.length - 1 < index) {
- return null;
- }
-
- return split[index];
- }
-
- /**
- * between first and second empty char
- *
- * @param buffer
- * @return
- */
- protected String readRequestLine(ChannelBuffer buffer) {
-
- // GET /test/demo HTTP/1.1
- int firstEmptyIndex = 0;
- // read first empty
- for (int i = 0; i < Integer.MAX_VALUE; i++) {
-
- int read = getByteByIndex(buffer, i);
- if (read == empty) {
- firstEmptyIndex = i;
- break;
- }
- }
-
- StringBuilder stringBuilder = new StringBuilder();
-
- for (int i = firstEmptyIndex + 1; i < Integer.MAX_VALUE; i++) {
- int read = getByteByIndex(buffer, i);
- // second empty break
- if (read == empty) {
- break;
- }
- stringBuilder.append((char) read);
- }
-
- return stringBuilder.toString();
- }
-
- public enum HttpMethod {
- GET("GET"),
- HEAD("HEAD"),
- POST("POST"),
- PUT("PUT"),
-
- PATCH("PATCH"),
- DELETE("DELETE"),
- OPTIONS("OPTIONS"),
- TRACE("TRACE");
-
- HttpMethod(String value) {
- this.value = value;
- }
-
- private String value;
-
- public String getValue() {
- return value;
- }
- }
-}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
index cc77b06a00..8c584efe41 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
@@ -273,7 +273,7 @@ public abstract class AbstractServerHttpChannelObserver
implements CustomizableH
}
@Override
- public void close() throws Exception {
+ public void close() {
closed();
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
index 294d7311bf..c745655c4d 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
@@ -18,7 +18,9 @@ package org.apache.dubbo.remoting.http12;
import org.apache.dubbo.common.stream.StreamObserver;
-public interface HttpChannelObserver<T> extends StreamObserver<T>,
AutoCloseable {
+public interface HttpChannelObserver<T> extends StreamObserver<T> {
HttpChannel getHttpChannel();
+
+ void close();
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java
index 57e1419eb0..9265a3ba93 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java
@@ -16,8 +16,7 @@
*/
package org.apache.dubbo.remoting.http12;
-public interface HttpTransportListener<HEADER extends HttpMetadata, MESSAGE
extends HttpInputMessage>
- extends AutoCloseable {
+public interface HttpTransportListener<HEADER extends HttpMetadata, MESSAGE
extends HttpInputMessage> {
void onMetadata(HEADER metadata);
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
index f68ae99821..4e7eaeb17d 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
@@ -109,7 +109,7 @@ public class Http2ServerChannelObserver extends
AbstractServerHttpChannelObserve
}
@Override
- public void close() throws Exception {
+ public void close() {
super.close();
streamingDecoder.onStreamClosed();
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
index 16531e7a93..2c4de24504 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
@@ -16,4 +16,7 @@
*/
package org.apache.dubbo.remoting.http12.h2;
-public interface Http2TransportListener extends
CancelableTransportListener<Http2Header, Http2InputMessage> {}
+public interface Http2TransportListener extends
CancelableTransportListener<Http2Header, Http2InputMessage> {
+
+ void close();
+}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java
index 452faba1b2..72649fce24 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java
@@ -55,6 +55,5 @@ public class NettyHttp1ConnectionHandler extends
SimpleChannelInboundHandler<Htt
new NettyHttp1Channel(ctx.channel(), tripleConfig), url,
frameworkModel);
http1TransportListener.onMetadata(http1Request);
http1TransportListener.onData(http1Request);
- ctx.channel().closeFuture().addListener(future ->
http1TransportListener.close());
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
index 26fda3e87e..9dc05b48d3 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
@@ -17,6 +17,9 @@
package org.apache.dubbo.remoting.http12.netty4.h2;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.config.nested.TripleConfig;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpHeaders;
@@ -32,13 +35,14 @@ import org.apache.dubbo.rpc.model.FrameworkModel;
import java.util.Set;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http2.Http2StreamChannel;
public class NettyHttp2ProtocolSelectorHandler extends
SimpleChannelInboundHandler<HttpMetadata> {
+ private static final String TRANSPORT_LISTENER_FACTORY_CACHE =
"TRANSPORT_LISTENER_FACTORY_CACHE";
private final URL url;
private final FrameworkModel frameworkModel;
@@ -62,22 +66,28 @@ public class NettyHttp2ProtocolSelectorHandler extends
SimpleChannelInboundHandl
protected void channelRead0(ChannelHandlerContext ctx, HttpMetadata
metadata) {
HttpHeaders headers = metadata.headers();
String contentType =
headers.getFirst(HttpHeaderNames.CONTENT_TYPE.getName());
- Http2ServerTransportListenerFactory factory =
determineHttp2ServerTransportListenerFactory(contentType);
+ Http2ServerTransportListenerFactory factory =
UrlUtils.computeServiceAttribute(
+ url,
+ TRANSPORT_LISTENER_FACTORY_CACHE,
+ url -> CollectionUtils.<String,
Http2ServerTransportListenerFactory>newConcurrentHashMap())
+ .computeIfAbsent(
+ contentType == null ? StringUtils.EMPTY_STRING :
contentType,
+ key ->
determineHttp2ServerTransportListenerFactory(contentType));
if (factory == null) {
throw new UnsupportedMediaTypeException(contentType);
}
- H2StreamChannel h2StreamChannel = new
NettyH2StreamChannel((Http2StreamChannel) ctx.channel(), tripleConfig);
- HttpWriteQueueHandler writeQueueHandler =
-
ctx.channel().parent().pipeline().get(HttpWriteQueueHandler.class);
+ Channel channel = ctx.channel();
+ H2StreamChannel h2StreamChannel = new
NettyH2StreamChannel((Http2StreamChannel) channel, tripleConfig);
+ HttpWriteQueueHandler writeQueueHandler =
channel.parent().pipeline().get(HttpWriteQueueHandler.class);
if (writeQueueHandler != null) {
HttpWriteQueue writeQueue = writeQueueHandler.getWriteQueue();
h2StreamChannel = new Http2WriteQueueChannel(h2StreamChannel,
writeQueue);
}
- ChannelPipeline pipeline = ctx.pipeline();
Http2TransportListener http2TransportListener =
factory.newInstance(h2StreamChannel, url, frameworkModel);
- ctx.channel().closeFuture().addListener(future ->
http2TransportListener.close());
- pipeline.addLast(new NettyHttp2FrameHandler(h2StreamChannel,
http2TransportListener));
- pipeline.remove(this);
+ channel.closeFuture().addListener(future ->
http2TransportListener.close());
+ ctx.pipeline()
+ .addLast(new NettyHttp2FrameHandler(h2StreamChannel,
http2TransportListener))
+ .remove(this);
ctx.fireChannelRead(metadata);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java
index f832bdd57c..17cfb01ecb 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java
@@ -20,7 +20,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.constants.LoggerCodeConstants;
import org.apache.dubbo.common.logger.FluentLogger;
-import org.apache.dubbo.common.utils.MethodUtils;
+import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.remoting.http12.ExceptionHandler;
import org.apache.dubbo.remoting.http12.HttpChannel;
import org.apache.dubbo.remoting.http12.HttpInputMessage;
@@ -54,13 +54,13 @@ public abstract class
AbstractServerTransportListener<HEADER extends RequestMeta
implements HttpTransportListener<HEADER, MESSAGE> {
private static final FluentLogger LOGGER =
FluentLogger.of(AbstractServerTransportListener.class);
+ private static final String HEADER_FILTERS_CACHE = "HEADER_FILTERS_CACHE";
private final FrameworkModel frameworkModel;
private final URL url;
private final HttpChannel httpChannel;
private final RequestRouter requestRouter;
private final ExceptionHandler<Throwable, ?> exceptionHandler;
- private final List<HeaderFilter> headerFilters;
private Executor executor;
private HEADER httpMetadata;
@@ -73,9 +73,6 @@ public abstract class AbstractServerTransportListener<HEADER
extends RequestMeta
this.httpChannel = httpChannel;
requestRouter =
frameworkModel.getBeanFactory().getOrRegisterBean(DefaultRequestRouter.class);
exceptionHandler =
frameworkModel.getBeanFactory().getOrRegisterBean(CompositeExceptionHandler.class);
- headerFilters = frameworkModel
- .getExtensionLoader(HeaderFilter.class)
- .getActivateExtension(url, CommonConstants.HEADER_FILTER_KEY);
}
@Override
@@ -125,6 +122,15 @@ public abstract class
AbstractServerTransportListener<HEADER extends RequestMeta
@Override
public void onData(MESSAGE message) {
+ if (executor == null) {
+ try {
+ Throwable t = new NullPointerException("Executor not
initialized");
+ logError(t);
+ onError(message, t);
+ } finally {
+ onFinally(message);
+ }
+ }
executor.execute(() -> {
try {
doOnData(message);
@@ -179,7 +185,7 @@ public abstract class
AbstractServerTransportListener<HEADER extends RequestMeta
if (context != null) {
MethodDescriptor md = context.getMethodDescriptor();
if (md != null) {
- sb.append(",
method=").append(MethodUtils.toShortString(md.getMethod()));
+ sb.append(", method=").append(md);
}
if (TripleProtocol.VERBOSE_ENABLED) {
Invoker<?> invoker = context.getInvoker();
@@ -198,7 +204,10 @@ public abstract class
AbstractServerTransportListener<HEADER extends RequestMeta
}
return sb.toString();
};
- LOGGER.msg(msg).log(exceptionHandler.resolveLogLevel(t), t);
+ try {
+ LOGGER.msg(msg).log(exceptionHandler.resolveLogLevel(t), t);
+ } catch (Throwable ignored) {
+ }
}
protected void onError(Throwable throwable) {
@@ -261,14 +270,27 @@ public abstract class
AbstractServerTransportListener<HEADER extends RequestMeta
if (null != consumerAppName) {
inv.put(TripleHeaderEnum.CONSUMER_APP_NAME_KEY, consumerAppName);
}
+
// customizer RpcInvocation
- headerFilters.forEach(f -> f.invoke(invoker, inv));
+ HeaderFilter[] headerFilters =
+ UrlUtils.computeServiceAttribute(invoker.getUrl(),
HEADER_FILTERS_CACHE, this::loadHeaderFilters);
+ for (HeaderFilter headerFilter : headerFilters) {
+ headerFilter.invoke(invoker, inv);
+ }
initializeAltSvc(url);
return onBuildRpcInvocationCompletion(inv);
}
+ private HeaderFilter[] loadHeaderFilters(URL url) {
+ List<HeaderFilter> headerFilters = frameworkModel
+ .getExtensionLoader(HeaderFilter.class)
+ .getActivateExtension(url, CommonConstants.HEADER_FILTER_KEY);
+ LOGGER.info("Header filters for [{}] loaded: {}", url, headerFilters);
+ return headerFilters.toArray(new HeaderFilter[0]);
+ }
+
/**
* <a
href="https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Alt-Svc">Alt-Svc</a>
*/
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
index aef98805c1..18eb6894d5 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.common.utils.ArrayUtils;
+import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
@@ -34,7 +35,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY;
@@ -63,15 +63,14 @@ public class GrpcCompositeCodec implements HttpMessageCodec
{
packableMethod = (PackableMethod) methodDescriptor;
return;
}
- Map<MethodDescriptor, PackableMethod> cacheMap =
(Map<MethodDescriptor, PackableMethod>) url.getServiceModel()
- .getServiceMetadata()
- .getAttributeMap()
- .computeIfAbsent(PACKABLE_METHOD_CACHE, k -> new
ConcurrentHashMap<>());
- packableMethod = cacheMap.computeIfAbsent(methodDescriptor, md ->
frameworkModel
- .getExtensionLoader(PackableMethodFactory.class)
-
.getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel())
- .getString(DUBBO_PACKABLE_METHOD_FACTORY, DEFAULT_KEY))
- .create(methodDescriptor, url, mediaType));
+
+ packableMethod = UrlUtils.computeServiceAttribute(
+ url, PACKABLE_METHOD_CACHE, k -> new
ConcurrentHashMap<MethodDescriptor, PackableMethod>())
+ .computeIfAbsent(methodDescriptor, md -> frameworkModel
+ .getExtensionLoader(PackableMethodFactory.class)
+
.getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel())
+ .getString(DUBBO_PACKABLE_METHOD_FACTORY,
DEFAULT_KEY))
+ .create(methodDescriptor, url, mediaType));
}
@Override
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
index dadf049912..f705514759 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
@@ -119,7 +119,7 @@ public class DefaultHttp11ServerTransportListener
}
@Override
- public void close() throws Exception {
+ protected void onFinally(HttpInputMessage message) {
serverChannelObserver.close();
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
index 96a642f0a9..eeb9aad7df 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
@@ -216,7 +216,7 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
}
@Override
- public void close() throws Exception {
+ public void close() {
getServerChannelObserver().close();
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/RestConstants.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/RestConstants.java
index c6434979fb..fc36800534 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/RestConstants.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/RestConstants.java
@@ -25,7 +25,6 @@ public final class RestConstants {
public static final String REST_FILTER_KEY = "rest.filter";
public static final String EXTENSION_KEY = "extension";
- public static final String EXTENSIONS_ATTRIBUTE_KEY =
"restExtensionsAttributeKey";
public static final int DIALECT_BASIC = 0;
public static final int DIALECT_SPRING_MVC = 1;
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/filter/RestExtensionExecutionFilter.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/filter/RestExtensionExecutionFilter.java
index eed8ac01f4..66f53fa94b 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/filter/RestExtensionExecutionFilter.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/filter/RestExtensionExecutionFilter.java
@@ -27,6 +27,7 @@ import org.apache.dubbo.common.utils.ArrayUtils;
import org.apache.dubbo.common.utils.ClassUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.remoting.http12.HttpRequest;
import org.apache.dubbo.remoting.http12.HttpResponse;
import org.apache.dubbo.rpc.AppResponse;
@@ -58,6 +59,7 @@ public class RestExtensionExecutionFilter extends
RestFilterAdapter {
private static final Logger LOGGER =
LoggerFactory.getLogger(RestExtensionExecutionFilter.class);
private static final String KEY =
RestExtensionExecutionFilter.class.getSimpleName();
+ private static final String REST_FILTER_CACHE = "REST_FILTER_CACHE";
private final Map<RestFilter, RadixTree<Boolean>> filterTreeCache =
CollectionUtils.newConcurrentHashMap();
private final ApplicationModel applicationModel;
@@ -191,15 +193,10 @@ public class RestExtensionExecutionFilter extends
RestFilterAdapter {
}
private RestFilter[] getFilters(Invoker<?> invoker) {
- URL url = invoker.getUrl();
- return (RestFilter[]) url.getServiceModel()
- .getServiceMetadata()
- .getAttributeMap()
- .computeIfAbsent(RestConstants.EXTENSIONS_ATTRIBUTE_KEY, k ->
loadFilters(url));
+ return UrlUtils.computeServiceAttribute(invoker.getUrl(),
REST_FILTER_CACHE, this::loadFilters);
}
private RestFilter[] loadFilters(URL url) {
- LOGGER.info("Loading rest filters for {}", url);
List<RestFilter> extensions = new ArrayList<>();
// 1. load from extension config
@@ -228,6 +225,7 @@ public class RestExtensionExecutionFilter extends
RestFilterAdapter {
// 3. sorts by order
extensions.sort(Comparator.comparingInt(RestUtils::getPriority));
+ LOGGER.info("Rest filters for [{}] loaded: {}", url, extensions);
return extensions.toArray(new RestFilter[0]);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/groovy/org/apache/dubbo/rpc/protocol/tri/rest/support/basic/RestProtocolTest.groovy
b/dubbo-rpc/dubbo-rpc-triple/src/test/groovy/org/apache/dubbo/rpc/protocol/tri/rest/support/basic/RestProtocolTest.groovy
index 3466cb775e..4b3547d238 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/groovy/org/apache/dubbo/rpc/protocol/tri/rest/support/basic/RestProtocolTest.groovy
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/groovy/org/apache/dubbo/rpc/protocol/tri/rest/support/basic/RestProtocolTest.groovy
@@ -229,4 +229,19 @@ class RestProtocolTest extends BaseServiceTest {
'/pbServerStream?request={"service": "3"}' | 3
}
+ def "produce test"() {
+ given:
+ def request = new TestRequest(
+ path: path,
+ accept: accept
+ )
+ expect:
+ runner.post(request) == output
+ where:
+ path | accept | output
+ '/produceTest?name=world' | '' | 'world'
+ '/produceTest?name=world' | 'text/plain' | 'world'
+ '/produceTest?name=world' | 'application/json' |
'{"message":"Invoker not found","status":"404"}'
+ }
+
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/rest/service/DemoService.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/rest/service/DemoService.java
index 732fcde91e..8469f14080 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/rest/service/DemoService.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/rest/service/DemoService.java
@@ -68,4 +68,7 @@ public interface DemoService {
String argNameTest(String name);
void pbServerStream(HealthCheckRequest request,
StreamObserver<HealthCheckResponse> responseObserver);
+
+ @Mapping(produces = "text/plain")
+ String produceTest(String name);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/rest/service/DemoServiceImpl.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/rest/service/DemoServiceImpl.java
index 39c2f9795a..767cd7e684 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/rest/service/DemoServiceImpl.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/rest/service/DemoServiceImpl.java
@@ -143,4 +143,9 @@ public class DemoServiceImpl implements DemoService {
}
responseObserver.onCompleted();
}
+
+ @Override
+ public String produceTest(String name) {
+ return name;
+ }
}