This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 1c1101d57a077db48d5fc80f874a52680364ddc8 Author: ken.lj <[email protected]> AuthorDate: Thu Nov 7 14:23:57 2019 +0800 enhance AbstractCluster interceptor --- .../org/apache/dubbo/bootstrap/DubboBootstrap.java | 2 + .../org/apache/dubbo/config/ReferenceConfig.java | 3 +- .../cluster/interceptor}/ClusterInterceptor.java | 26 ++-- .../ConsumerContextClusterInterceptor.java | 118 ++++++++--------- .../interceptor}/ZoneAwareClusterInterceptor.java | 11 +- .../cluster/support/AbstractClusterInvoker.java | 7 +- .../rpc/cluster/support/BroadcastCluster.java | 9 +- .../dubbo/rpc/cluster/support/FailbackCluster.java | 9 +- .../dubbo/rpc/cluster/support/FailfastCluster.java | 9 +- .../dubbo/rpc/cluster/support/FailoverCluster.java | 9 +- .../dubbo/rpc/cluster/support/FailsafeCluster.java | 9 +- .../dubbo/rpc/cluster/support/ForkingCluster.java | 9 +- .../rpc/cluster/support/MergeableCluster.java | 7 +- .../cluster/support/registry/ZoneAwareCluster.java | 4 +- .../cluster/support/wrapper/AbstractCluster.java | 146 ++++++++++++--------- .../support/wrapper/MockClusterWrapper.java | 7 +- ...ubbo.rpc.cluster.interceptor.ClusterInterceptor | 2 + .../org/apache/dubbo/rpc/cluster/StickyTest.java | 4 +- .../loadbalance/AbstractLoadBalanceTest.java | 26 +++- .../router/condition/ConditionRouterTest.java | 2 +- .../support/FailoverClusterInvokerTest.java | 2 + .../dubbo/common/infra/support/CmdbAdapter.java | 5 +- .../manager/DefaultExecutorRepository.java | 28 +++- .../apache/dubbo/rpc/model/ApplicationModel.java | 39 ------ .../model/BuiltinServiceDetector.java} | 22 +--- .../apache/dubbo/rpc/model/ServiceDescriptor.java | 27 +++- .../apache/dubbo/rpc/model/ServiceRepository.java | 32 +++++ .../service/EchoServiceDetector.java} | 18 +-- .../service/GenericServiceDetector.java} | 20 +-- ...g.apache.dubbo.rpc.model.BuiltinServiceDetector | 2 + .../DynamicConfigurationFactoryTest.java | 6 +- .../FileSystemDynamicConfigurationFactoryTest.java | 5 +- dubbo-dependencies-bom/pom.xml | 2 +- .../monitor/support/MetricsServiceDetector.java | 21 +-- .../monitor/support/MonitorServiceDetector.java | 21 +-- ...g.apache.dubbo.rpc.model.BuiltinServiceDetector | 2 + .../dubbo/monitor/dubbo/DubboMonitorFactory.java | 2 - .../apache/dubbo/monitor/dubbo/MetricsFilter.java | 2 - .../dubbo/remoting/transport/AbstractChannel.java | 4 +- .../dubbo/remoting/transport/AbstractClient.java | 9 +- .../dispatcher/WrappedChannelHandler.java | 8 +- .../utils/{LogUtils.java => PayloadDropper.java} | 5 +- .../remoting/transport/netty/NettyChannel.java | 6 +- .../remoting/transport/netty/NettyServer.java | 2 +- .../support/header/HeartbeatHandlerTest.java | 4 +- .../remoting/transport/netty/ThreadNameTest.java | 13 +- .../remoting/transport/netty4/NettyChannel.java | 6 +- .../remoting/transport/netty4/NettyServer.java | 2 +- .../java/org/apache/dubbo/rpc/RpcInvocation.java | 35 +++-- .../ConsumerContextFilter.java} | 121 ++++++++--------- .../apache/dubbo/rpc/filter/ExceptionFilter.java | 5 + .../dubbo/rpc/protocol/ProtocolFilterWrapper.java | 2 + .../org/apache/dubbo/rpc/support/RpcUtils.java | 5 + .../org.apache.dubbo.rpc.ClusterInterceptor | 2 - .../dubbo/internal/org.apache.dubbo.rpc.Filter | 1 + .../dubbo/rpc/filter/ExceptionFilterTest.java | 1 + .../protocol/dubbo/DecodeableRpcInvocation.java | 27 ++-- .../rpc/protocol/dubbo/DubboProtocolTest.java | 16 +-- .../dubbo/rpc/protocol/dubbo/RpcFilterTest.java | 4 +- .../dubbo/rpc/protocol/dubbo/support/EnumBak.java | 1 + pom.xml | 1 + 61 files changed, 512 insertions(+), 443 deletions(-) diff --git a/dubbo-bootstrap/dubbo-bootstrap-api/src/main/java/org/apache/dubbo/bootstrap/DubboBootstrap.java b/dubbo-bootstrap/dubbo-bootstrap-api/src/main/java/org/apache/dubbo/bootstrap/DubboBootstrap.java index d537537..1c1bb87 100644 --- a/dubbo-bootstrap/dubbo-bootstrap-api/src/main/java/org/apache/dubbo/bootstrap/DubboBootstrap.java +++ b/dubbo-bootstrap/dubbo-bootstrap-api/src/main/java/org/apache/dubbo/bootstrap/DubboBootstrap.java @@ -1302,6 +1302,8 @@ public class DubboBootstrap extends GenericEventListener { String pathKey = URL.buildKey(sc.getContextPath(protocolConfig) .map(p -> p + "/" + sc.getPath()) .orElse(sc.getPath()), sc.getGroup(), sc.getVersion()); + // In case user specified path, register service one more time to map it to path. + repository.registerService(pathKey, sc.getInterfaceClass()); // TODO, uncomment this line once service key is unified sc.getServiceMetadata().setServiceKey(pathKey); exporters.addAll(doExportUrlsFor1Protocol(sc, protocolConfig, registryURLs)); diff --git a/dubbo-bootstrap/dubbo-bootstrap-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java b/dubbo-bootstrap/dubbo-bootstrap-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java index 654d5e9..6cfbbcb 100644 --- a/dubbo-bootstrap/dubbo-bootstrap-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java +++ b/dubbo-bootstrap/dubbo-bootstrap-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java @@ -24,7 +24,8 @@ import org.apache.dubbo.bootstrap.DubboBootstrap; */ @Deprecated public class ReferenceConfig<T> extends org.apache.dubbo.config.service.ReferenceConfig<T> { - DubboBootstrap bootstrap = DubboBootstrap.getInstance(); + + private DubboBootstrap bootstrap = DubboBootstrap.getInstance(); @Deprecated public synchronized T get() { diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ClusterInterceptor.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java similarity index 54% rename from dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ClusterInterceptor.java rename to dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java index 48cdcdd..5bf100b 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ClusterInterceptor.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java @@ -14,9 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.rpc; +package org.apache.dubbo.rpc.cluster.interceptor; import org.apache.dubbo.common.extension.SPI; +import org.apache.dubbo.rpc.Filter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker; /** * Different from {@link Filter}, ClusterInterceptor works at the outmost layer, before one specific address/invoker is picked. @@ -24,27 +29,28 @@ import org.apache.dubbo.common.extension.SPI; @SPI public interface ClusterInterceptor { - void before(Invoker<?> invoker, Invocation invocation); + void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation); - void after(Invoker<?> invoker, Invocation invocation); + void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation); /** - * Does not need to override this method, override {@link #before(Invoker, Invocation)} and {@link #after(Invoker, Invocation)} - * methods to add your own logic expected to be executed before and after invoke. + * Does not need to override this method, override {@link #before(AbstractClusterInvoker, Invocation)} + * and {@link #after(AbstractClusterInvoker, Invocation)}, methods to add your own logic expected to be + * executed before and after invoke. * - * @param invoker + * @param clusterInvoker * @param invocation * @return * @throws RpcException */ - default Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { - return invoker.invoke(invocation); + default Result intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) throws RpcException { + return clusterInvoker.invoke(invocation); } interface Listener { - void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation); + void onResponse(Result appResponse, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation); - void onError(Throwable t, Invoker<?> invoker, Invocation invocation); + void onError(Throwable t, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation); } } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/interceptors/ConsumerContextClusterInterceptor.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java similarity index 60% copy from dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/interceptors/ConsumerContextClusterInterceptor.java copy to dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java index 333878d..f53f33c 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/interceptors/ConsumerContextClusterInterceptor.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java @@ -1,63 +1,55 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.rpc.interceptors; - -import org.apache.dubbo.common.extension.Activate; -import org.apache.dubbo.common.utils.NetUtils; -import org.apache.dubbo.rpc.ClusterInterceptor; -import org.apache.dubbo.rpc.Invocation; -import org.apache.dubbo.rpc.Invoker; -import org.apache.dubbo.rpc.Result; -import org.apache.dubbo.rpc.RpcContext; -import org.apache.dubbo.rpc.RpcInvocation; - -import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_APPLICATION_KEY; -import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY; - -@Activate -public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener { - - @Override - public void before(Invoker<?> invoker, Invocation invocation) { - RpcContext.getContext() - .setInvoker(invoker) - .setInvocation(invocation) - .setLocalAddress(NetUtils.getHostAddress(), 0) - .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()) - .setRemoteApplicationName(invoker.getUrl().getParameter(REMOTE_APPLICATION_KEY)) - .setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getParameter(APPLICATION_KEY)); - if (invocation instanceof RpcInvocation) { - ((RpcInvocation) invocation).setInvoker(invoker); - } - RpcContext.removeServerContext(); - } - - @Override - public void after(Invoker<?> invoker, Invocation invocation) { - RpcContext.removeContext(); - } - - @Override - public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) { - RpcContext.getServerContext().setAttachments(appResponse.getAttachments()); - } - - @Override - public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) { - - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.interceptor; + +import org.apache.dubbo.common.extension.Activate; +import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcContext; +import org.apache.dubbo.rpc.RpcInvocation; +import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker; + +@Activate +public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener { + + @Override + public void before(AbstractClusterInvoker<?> invoker, Invocation invocation) { + RpcContext.getContext() + .setInvocation(invocation) + .setLocalAddress(NetUtils.getHostAddress(), 0); + if (invocation instanceof RpcInvocation) { + ((RpcInvocation) invocation).setInvoker(invoker); + } + RpcContext.removeServerContext(); + } + + @Override + public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) { + RpcContext.removeContext(); + } + + @Override + public void onResponse(Result appResponse, AbstractClusterInvoker<?> invoker, Invocation invocation) { + RpcContext.getServerContext().setAttachments(appResponse.getAttachments()); + } + + @Override + public void onError(Throwable t, AbstractClusterInvoker<?> invoker, Invocation invocation) { + + } +} diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/interceptors/ZoneAwareClusterInterceptor.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ZoneAwareClusterInterceptor.java similarity index 86% rename from dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/interceptors/ZoneAwareClusterInterceptor.java rename to dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ZoneAwareClusterInterceptor.java index 7e256e0..6daec08 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/interceptors/ZoneAwareClusterInterceptor.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ZoneAwareClusterInterceptor.java @@ -14,28 +14,29 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.rpc.interceptors; +package org.apache.dubbo.rpc.cluster.interceptor; import org.apache.dubbo.common.extension.Activate; import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.utils.StringUtils; -import org.apache.dubbo.rpc.ClusterInterceptor; import org.apache.dubbo.rpc.Invocation; -import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcContext; import org.apache.dubbo.rpc.ZoneDetector; +import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE_FORCE; /** * Determines the zone information of current request. + * + * active only when url has key 'cluster=zone-aware' */ @Activate(value = "cluster:zone-aware") public class ZoneAwareClusterInterceptor implements ClusterInterceptor { @Override - public void before(Invoker<?> invoker, Invocation invocation) { + public void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) { RpcContext rpcContext = RpcContext.getContext(); String zone = (String) rpcContext.getAttachment(REGISTRY_ZONE); String force = (String) rpcContext.getAttachment(REGISTRY_ZONE_FORCE); @@ -55,7 +56,7 @@ public class ZoneAwareClusterInterceptor implements ClusterInterceptor { } @Override - public void after(Invoker<?> invoker, Invocation invocation) { + public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) { } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java index 81f7ba8..d5437bf 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java @@ -53,14 +53,17 @@ public abstract class AbstractClusterInvoker<T> implements Invoker<T> { private static final Logger logger = LoggerFactory.getLogger(AbstractClusterInvoker.class); - protected final Directory<T> directory; + protected Directory<T> directory; - protected final boolean availablecheck; + protected boolean availablecheck; private AtomicBoolean destroyed = new AtomicBoolean(false); private volatile Invoker<T> stickyInvoker = null; + public AbstractClusterInvoker() { + } + public AbstractClusterInvoker(Directory<T> directory) { this(directory, directory.getUrl()); } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/BroadcastCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/BroadcastCluster.java index c0cbdf7..f5cab05 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/BroadcastCluster.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/BroadcastCluster.java @@ -16,20 +16,19 @@ */ package org.apache.dubbo.rpc.cluster.support; -import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.cluster.Cluster; import org.apache.dubbo.rpc.cluster.Directory; +import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster; /** * BroadcastCluster * */ -public class BroadcastCluster implements Cluster { +public class BroadcastCluster extends AbstractCluster { @Override - public <T> Invoker<T> join(Directory<T> directory) throws RpcException { - return new BroadcastClusterInvoker<T>(directory); + public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException { + return new BroadcastClusterInvoker<>(directory); } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackCluster.java index 34c0199..4f4400b 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackCluster.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackCluster.java @@ -16,22 +16,21 @@ */ package org.apache.dubbo.rpc.cluster.support; -import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.cluster.Cluster; import org.apache.dubbo.rpc.cluster.Directory; +import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster; /** * {@link FailbackClusterInvoker} * */ -public class FailbackCluster implements Cluster { +public class FailbackCluster extends AbstractCluster { public final static String NAME = "failback"; @Override - public <T> Invoker<T> join(Directory<T> directory) throws RpcException { - return new FailbackClusterInvoker<T>(directory); + public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException { + return new FailbackClusterInvoker<>(directory); } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailfastCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailfastCluster.java index 5633603..14a8565 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailfastCluster.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailfastCluster.java @@ -16,22 +16,21 @@ */ package org.apache.dubbo.rpc.cluster.support; -import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.cluster.Cluster; import org.apache.dubbo.rpc.cluster.Directory; +import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster; /** * {@link FailfastClusterInvoker} * */ -public class FailfastCluster implements Cluster { +public class FailfastCluster extends AbstractCluster { public final static String NAME = "failfast"; @Override - public <T> Invoker<T> join(Directory<T> directory) throws RpcException { - return new FailfastClusterInvoker<T>(directory); + public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException { + return new FailfastClusterInvoker<>(directory); } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailoverCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailoverCluster.java index 38d0f58..7f1bc54 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailoverCluster.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailoverCluster.java @@ -16,22 +16,21 @@ */ package org.apache.dubbo.rpc.cluster.support; -import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.cluster.Cluster; import org.apache.dubbo.rpc.cluster.Directory; +import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster; /** * {@link FailoverClusterInvoker} * */ -public class FailoverCluster implements Cluster { +public class FailoverCluster extends AbstractCluster { public final static String NAME = "failover"; @Override - public <T> Invoker<T> join(Directory<T> directory) throws RpcException { - return new FailoverClusterInvoker<T>(directory); + public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException { + return new FailoverClusterInvoker<>(directory); } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailsafeCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailsafeCluster.java index eed0b5b..767ab31 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailsafeCluster.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailsafeCluster.java @@ -16,22 +16,21 @@ */ package org.apache.dubbo.rpc.cluster.support; -import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.cluster.Cluster; import org.apache.dubbo.rpc.cluster.Directory; +import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster; /** * {@link FailsafeClusterInvoker} * */ -public class FailsafeCluster implements Cluster { +public class FailsafeCluster extends AbstractCluster { public final static String NAME = "failsafe"; @Override - public <T> Invoker<T> join(Directory<T> directory) throws RpcException { - return new FailsafeClusterInvoker<T>(directory); + public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException { + return new FailsafeClusterInvoker<>(directory); } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingCluster.java index 3701d64..b28cffd 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingCluster.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingCluster.java @@ -16,22 +16,21 @@ */ package org.apache.dubbo.rpc.cluster.support; -import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.cluster.Cluster; import org.apache.dubbo.rpc.cluster.Directory; +import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster; /** * {@link ForkingClusterInvoker} * */ -public class ForkingCluster implements Cluster { +public class ForkingCluster extends AbstractCluster { public final static String NAME = "forking"; @Override - public <T> Invoker<T> join(Directory<T> directory) throws RpcException { - return new ForkingClusterInvoker<T>(directory); + public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException { + return new ForkingClusterInvoker<>(directory); } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableCluster.java index 1e0f43d..720d21d 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableCluster.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableCluster.java @@ -16,17 +16,16 @@ */ package org.apache.dubbo.rpc.cluster.support; -import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.cluster.Cluster; import org.apache.dubbo.rpc.cluster.Directory; +import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster; -public class MergeableCluster implements Cluster { +public class MergeableCluster extends AbstractCluster { public static final String NAME = "mergeable"; @Override - public <T> Invoker<T> join(Directory<T> directory) throws RpcException { + public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException { return new MergeableClusterInvoker<T>(directory); } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareCluster.java index abd80f1..c11781e 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareCluster.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/registry/ZoneAwareCluster.java @@ -16,9 +16,9 @@ */ package org.apache.dubbo.rpc.cluster.support.registry; -import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.cluster.Directory; +import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker; import org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster; /** @@ -29,7 +29,7 @@ public class ZoneAwareCluster extends AbstractCluster { public final static String NAME = "zone-aware"; @Override - protected <T> Invoker<T> doJoin(Directory<T> directory) throws RpcException { + protected <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException { return new ZoneAwareClusterInvoker<T>(directory); } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java index b1b8c27..bb8297f 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java @@ -18,13 +18,15 @@ package org.apache.dubbo.rpc.cluster.support.wrapper; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.extension.ExtensionLoader; -import org.apache.dubbo.rpc.ClusterInterceptor; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.cluster.Cluster; import org.apache.dubbo.rpc.cluster.Directory; +import org.apache.dubbo.rpc.cluster.LoadBalance; +import org.apache.dubbo.rpc.cluster.interceptor.ClusterInterceptor; +import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker; import java.util.List; @@ -32,70 +34,15 @@ import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_INTERC public abstract class AbstractCluster implements Cluster { - private <T> Invoker<T> buildClusterInterceptors(Invoker<T> invoker, String key) { - Invoker<T> last = invoker; - List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtension(invoker.getUrl(), key); + private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) { + AbstractClusterInvoker<T> last = clusterInvoker; + List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtension(clusterInvoker.getUrl(), key); if (!interceptors.isEmpty()) { for (int i = interceptors.size() - 1; i >= 0; i--) { final ClusterInterceptor interceptor = interceptors.get(i); - final Invoker<T> next = last; - last = new Invoker<T>() { - - @Override - public Class<T> getInterface() { - return invoker.getInterface(); - } - - @Override - public URL getUrl() { - return invoker.getUrl(); - } - - @Override - public boolean isAvailable() { - return invoker.isAvailable(); - } - - @Override - public Result invoke(Invocation invocation) throws RpcException { - Result asyncResult; - try { - interceptor.before(next, invocation); - asyncResult = interceptor.invoke(next, invocation); - } catch (Exception e) { - // onError callback - if (interceptor instanceof ClusterInterceptor.Listener) { - ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; - listener.onError(e, invoker, invocation); - } - throw e; - } finally { - interceptor.after(next, invocation); - } - return asyncResult.whenCompleteWithContext((r, t) -> { - // onResponse callback - if (interceptor instanceof ClusterInterceptor.Listener) { - ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; - if (t == null) { - listener.onResponse(r, invoker, invocation); - } else { - listener.onError(t, invoker, invocation); - } - } - }); - } - - @Override - public void destroy() { - invoker.destroy(); - } - - @Override - public String toString() { - return invoker.toString(); - } - }; + final AbstractClusterInvoker<T> next = last; + last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next); } } return last; @@ -106,5 +53,80 @@ public abstract class AbstractCluster implements Cluster { return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY)); } - protected abstract <T> Invoker<T> doJoin(Directory<T> directory) throws RpcException; + protected abstract <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException; + + protected class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> { + + private AbstractClusterInvoker<T> clusterInvoker; + private ClusterInterceptor interceptor; + private AbstractClusterInvoker<T> next; + + public InterceptorInvokerNode(AbstractClusterInvoker<T> clusterInvoker, + ClusterInterceptor interceptor, + AbstractClusterInvoker<T> next) { + this.clusterInvoker = clusterInvoker; + this.interceptor = interceptor; + this.next = next; + } + + @Override + public Class<T> getInterface() { + return clusterInvoker.getInterface(); + } + + @Override + public URL getUrl() { + return clusterInvoker.getUrl(); + } + + @Override + public boolean isAvailable() { + return clusterInvoker.isAvailable(); + } + + @Override + public Result invoke(Invocation invocation) throws RpcException { + Result asyncResult; + try { + interceptor.before(next, invocation); + asyncResult = interceptor.intercept(next, invocation); + } catch (Exception e) { + // onError callback + if (interceptor instanceof ClusterInterceptor.Listener) { + ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; + listener.onError(e, clusterInvoker, invocation); + } + throw e; + } finally { + interceptor.after(next, invocation); + } + return asyncResult.whenCompleteWithContext((r, t) -> { + // onResponse callback + if (interceptor instanceof ClusterInterceptor.Listener) { + ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; + if (t == null) { + listener.onResponse(r, clusterInvoker, invocation); + } else { + listener.onError(t, clusterInvoker, invocation); + } + } + }); + } + + @Override + public void destroy() { + clusterInvoker.destroy(); + } + + @Override + public String toString() { + return clusterInvoker.toString(); + } + + @Override + protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { + // The only purpose is to build a interceptor chain, so the cluster related logic doesn't matter. + return null; + } + } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterWrapper.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterWrapper.java index aba2438..cfe8cec 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterWrapper.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterWrapper.java @@ -24,11 +24,8 @@ import org.apache.dubbo.rpc.cluster.Directory; /** * mock impl * - * Notice! Except for mock, this class also wraps the Invoker with {@link org.apache.dubbo.rpc.ClusterInterceptor} - * by extending {@link AbstractCluster#join(Directory)}. - * */ -public class MockClusterWrapper extends AbstractCluster { +public class MockClusterWrapper implements Cluster { private Cluster cluster; @@ -37,7 +34,7 @@ public class MockClusterWrapper extends AbstractCluster { } @Override - protected <T> Invoker<T> doJoin(Directory<T> directory) throws RpcException { + public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); } diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.interceptor.ClusterInterceptor b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.interceptor.ClusterInterceptor new file mode 100644 index 0000000..3f3f008 --- /dev/null +++ b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.interceptor.ClusterInterceptor @@ -0,0 +1,2 @@ +context=org.apache.dubbo.rpc.cluster.interceptor.ConsumerContextClusterInterceptor +zone-aware=org.apache.dubbo.rpc.cluster.interceptor.ZoneAwareClusterInterceptor \ No newline at end of file diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/StickyTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/StickyTest.java index 3414820..2862f2c 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/StickyTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/StickyTest.java @@ -75,14 +75,14 @@ public class StickyTest { @Test public void testStickyNoCheck() { - int count = testSticky(null, false); + int count = testSticky("t1", false); System.out.println(count); Assertions.assertTrue(count > 0 && count <= runs); } @Test public void testStickyForceCheck() { - int count = testSticky(null, true); + int count = testSticky("t2", true); Assertions.assertTrue(count == 0 || count == runs); } diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalanceTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalanceTest.java index 678fba7..eec4d89 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalanceTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/loadbalance/AbstractLoadBalanceTest.java @@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcInvocation; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -28,6 +29,8 @@ import java.util.HashMap; import java.util.List; import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY; +import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY; +import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; @@ -46,15 +49,34 @@ public class AbstractLoadBalanceTest { invocation.setMethodName("say"); Invoker invoker1 = mock(Invoker.class, Mockito.withSettings().stubOnly()); - URL url1 = new URL("", "", 0, new HashMap<>()); + URL url1 = new URL("", "", 0, "DemoService", new HashMap<>()); url1 = url1.addParameter(TIMESTAMP_KEY, System.currentTimeMillis() - Integer.MAX_VALUE - 1); given(invoker1.getUrl()).willReturn(url1); Invoker invoker2 = mock(Invoker.class, Mockito.withSettings().stubOnly()); - URL url2 = new URL("", "", 0, new HashMap<>()); + URL url2 = new URL("", "", 0, "DemoService", new HashMap<>()); url2 = url2.addParameter(TIMESTAMP_KEY, System.currentTimeMillis() - 10 * 60 * 1000L - 1); given(invoker2.getUrl()).willReturn(url2); Assertions.assertEquals(balance.getWeight(invoker1, invocation), balance.getWeight(invoker2, invocation)); } + + @Test + public void testGetRegistryWeight() { + RpcInvocation invocation = new RpcInvocation(); + invocation.setMethodName("say"); + + Invoker invoker1 = mock(Invoker.class, Mockito.withSettings().stubOnly()); + URL url1 = new URL("", "", 0, "DemoService", new HashMap<>()); + url1 = url1.addParameter(REGISTRY_KEY + "." + WEIGHT_KEY, 10); + given(invoker1.getUrl()).willReturn(url1); + + Invoker invoker2 = mock(Invoker.class, Mockito.withSettings().stubOnly()); + URL url2 = new URL("", "", 0, "org.apache.dubbo.registry.RegistryService", new HashMap<>()); + url2 = url2.addParameter(REGISTRY_KEY + "." + WEIGHT_KEY, 20); + given(invoker2.getUrl()).willReturn(url2); + + Assertions.assertEquals(100, balance.getWeight(invoker1, invocation)); + Assertions.assertEquals(20, balance.getWeight(invoker2, invocation)); + } } diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/condition/ConditionRouterTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/condition/ConditionRouterTest.java index 1d0ee37..00fdf53 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/condition/ConditionRouterTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/condition/ConditionRouterTest.java @@ -88,7 +88,7 @@ public class ConditionRouterTest { public void testRoute_matchFilter() { List<Invoker<String>> invokers = new ArrayList<Invoker<String>>(); Invoker<String> invoker1 = new MockInvoker<String>(URL.valueOf( - "dubbo://10.20.3.3:20880/com.foo.BarService?default.serialization=fastjson")); + "dubbo://10.20.3.3:20880/com.foo.BarService?serialization=fastjson")); Invoker<String> invoker2 = new MockInvoker<String>(URL.valueOf("dubbo://" + LOCAL_HOST + ":20880/com.foo.BarService")); Invoker<String> invoker3 = new MockInvoker<String>(URL.valueOf("dubbo://" + LOCAL_HOST diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java index 4c17fba..790d7cd 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java @@ -18,6 +18,7 @@ package org.apache.dubbo.rpc.cluster.support; import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.AppResponse; +import org.apache.dubbo.rpc.AsyncRpcResult; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; @@ -186,6 +187,7 @@ public class FailoverClusterInvokerTest { } invokers.clear(); MockInvoker<Demo> invoker3 = new MockInvoker<Demo>(Demo.class, url); + invoker3.setResult(AsyncRpcResult.newDefaultAsyncResult(null)); invokers.add(invoker3); return null; }; diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/CmdbAdapter.java b/dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/CmdbAdapter.java index 8b20895..180ae05 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/CmdbAdapter.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/CmdbAdapter.java @@ -18,6 +18,7 @@ package org.apache.dubbo.common.infra.support; import org.apache.dubbo.common.infra.InfraAdapter; +import java.util.Collections; import java.util.Map; public class CmdbAdapter implements InfraAdapter { @@ -28,11 +29,11 @@ public class CmdbAdapter implements InfraAdapter { @Override public Map<String, String> getExtraAttributes(Map<String, String> params) { - return null; + return Collections.emptyMap(); } @Override public String getAttribute(String key) { - return null; + return ""; } } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java index 1499056..f629c0d 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java @@ -61,13 +61,21 @@ public class DefaultExecutorRepository implements ExecutorRepository { // reconnectScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-reconnect-scheduler")); } - public ExecutorService createExecutorIfAbsent(URL url) { + public synchronized ExecutorService createExecutorIfAbsent(URL url) { String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY; if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) { componentKey = CONSUMER_SIDE; } Map<Integer, ExecutorService> executors = data.computeIfAbsent(componentKey, k -> new ConcurrentHashMap<>()); - return executors.computeIfAbsent(url.getPort(), k -> (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url)); + Integer portKey = url.getPort(); + ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url)); + // If executor has been shut down, create a new one + if (executor.isShutdown() || executor.isTerminated()) { + executors.remove(portKey); + executor = createExecutor(url); + executors.put(portKey, executor); + } + return executor; } public ExecutorService getExecutor(URL url) { @@ -79,7 +87,17 @@ public class DefaultExecutorRepository implements ExecutorRepository { if (executors == null) { return null; } - return executors.get(url.getPort()); + + Integer portKey = url.getPort(); + ExecutorService executor = executors.get(portKey); + if (executor != null) { + if (executor.isShutdown() || executor.isTerminated()) { + executors.remove(portKey); + executor = createExecutor(url); + executors.put(portKey, executor); + } + } + return executor; } @Override @@ -120,4 +138,8 @@ public class DefaultExecutorRepository implements ExecutorRepository { return SHARED_EXECUTOR; } + private ExecutorService createExecutor(URL url) { + return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); + } + } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ApplicationModel.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ApplicationModel.java index 6cdfc9d..459ffb5 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ApplicationModel.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ApplicationModel.java @@ -25,7 +25,6 @@ import org.apache.dubbo.config.ApplicationConfig; import org.apache.dubbo.config.context.ConfigManager; import java.util.Collection; -import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -76,44 +75,6 @@ public class ApplicationModel { return getServiceRepository().lookupReferredService(serviceKey); } -// public static void initProviderModel(String serviceName, ProviderModel providerModel) { -// if (PROVIDED_SERVICES.putIfAbsent(serviceName, providerModel) != null) { -// LOGGER.warn("Already register the same:" + serviceName); -// } -// } - -// public static ServiceDescriptor registerServiceModel(Class<?> interfaceClass) { -// return SERVICES.computeIfAbsent(interfaceClass.getName(), (k) -> new ServiceDescriptor(interfaceClass)); -// } - -// /** -// * See {@link #registerServiceModel(Class)} -// * -// * we assume: -// * 1. services with different interface are not allowed to have the same path. -// * 2. services with the same interface but different group/version can share the same path. -// * 3. path's default value is the name of the interface. -// * @param path -// * @param interfaceClass -// * @return -// */ -// public static ServiceDescriptor registerServiceModel(String path, Class<?> interfaceClass) { -// ServiceDescriptor serviceModel = registerServiceModel(interfaceClass); -// // register path -// if (!interfaceClass.getName().equals(path)) { -// SERVICES.putIfAbsent(path, serviceModel); -// } -// return serviceModel; -// } - - public static Optional<ServiceDescriptor> getServiceModel(String interfaceName) { - return Optional.ofNullable(getServiceRepository().lookupService(interfaceName)); - } - - public static Optional<ServiceDescriptor> getServiceModel(Class<?> interfaceClass) { - return Optional.ofNullable(getServiceRepository().lookupService(interfaceClass.getName())); - } - /** * instances **/ diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/CmdbAdapter.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/BuiltinServiceDetector.java similarity index 65% copy from dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/CmdbAdapter.java copy to dubbo-common/src/main/java/org/apache/dubbo/rpc/model/BuiltinServiceDetector.java index 8b20895..06a5725 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/CmdbAdapter.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/BuiltinServiceDetector.java @@ -14,25 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.common.infra.support; +package org.apache.dubbo.rpc.model; -import org.apache.dubbo.common.infra.InfraAdapter; +import org.apache.dubbo.common.extension.SPI; -import java.util.Map; +@SPI +public interface BuiltinServiceDetector { -public class CmdbAdapter implements InfraAdapter { + Class<?> getService(); - public CmdbAdapter() { - // init; - } - - @Override - public Map<String, String> getExtraAttributes(Map<String, String> params) { - return null; - } - - @Override - public String getAttribute(String key) { - return null; - } } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceDescriptor.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceDescriptor.java index 8bbbfa7..395baf8 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceDescriptor.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceDescriptor.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Optional; import java.util.Set; /** @@ -77,24 +76,38 @@ public class ServiceDescriptor { return methodModels; } - public Optional<MethodDescriptor> getMethod(String methodName, String params) { + /** + * Does not use Optional as return type to avoid potential performance decrease. + * + * @param methodName + * @param params + * @return + */ + public MethodDescriptor getMethod(String methodName, String params) { Map<String, MethodDescriptor> methods = descToMethods.get(methodName); if (CollectionUtils.isNotEmptyMap(methods)) { - return Optional.ofNullable(methods.get(params)); + return methods.get(params); } - return Optional.empty(); + return null; } - public Optional<MethodDescriptor> getMethod(String methodName, Class<?>[] paramTypes) { + /** + * Does not use Optional as return type to avoid potential performance decrease. + * + * @param methodName + * @param paramTypes + * @return + */ + public MethodDescriptor getMethod(String methodName, Class<?>[] paramTypes) { Set<MethodDescriptor> methodModels = methods.get(methodName); if (CollectionUtils.isNotEmpty(methodModels)) { for (MethodDescriptor methodModel : methodModels) { if (Arrays.equals(paramTypes, methodModel.getParameterClasses())) { - return Optional.of(methodModel); + return methodModel; } } } - return Optional.empty(); + return null; } public Set<MethodDescriptor> getMethods(String methodName) { diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceRepository.java index 19ae0d7..2141572 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceRepository.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ServiceRepository.java @@ -18,6 +18,7 @@ package org.apache.dubbo.rpc.model; import org.apache.dubbo.common.context.FrameworkExt; import org.apache.dubbo.common.context.LifecycleAdapter; +import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.config.service.ReferenceConfig; import org.apache.dubbo.config.service.ServiceConfig; @@ -43,11 +44,42 @@ public class ServiceRepository extends LifecycleAdapter implements FrameworkExt // providers private ConcurrentMap<String, ProviderModel> providers = new ConcurrentHashMap<>(); + public ServiceRepository() { + Set<BuiltinServiceDetector> builtinServices + = ExtensionLoader.getExtensionLoader(BuiltinServiceDetector.class).getSupportedExtensionInstances(); + if (CollectionUtils.isNotEmpty(builtinServices)) { + for (BuiltinServiceDetector service : builtinServices) { + registerService(service.getService()); + } + } + } + public ServiceDescriptor registerService(Class<?> interfaceClazz) { return services.computeIfAbsent(interfaceClazz.getName(), _k -> new ServiceDescriptor(interfaceClazz)); } + /** + * See {@link #registerService(Class)} + * <p> + * we assume: + * 1. services with different interfaces are not allowed to have the same path. + * 2. services share the same interface but has different group/version can share the same path. + * 3. path's default value is the name of the interface. + * + * @param path + * @param interfaceClass + * @return + */ + public ServiceDescriptor registerService(String path, Class<?> interfaceClass) { + ServiceDescriptor serviceDescriptor = registerService(interfaceClass); + // if path is different with interface name, add extra path mapping + if (!interfaceClass.getName().equals(path)) { + services.putIfAbsent(path, serviceDescriptor); + } + return serviceDescriptor; + } + public void registerConsumer(String serviceKey, Map<String, Object> attributes, ServiceDescriptor serviceModel, diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/KubernetesAdapter.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/service/EchoServiceDetector.java similarity index 66% rename from dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/KubernetesAdapter.java rename to dubbo-common/src/main/java/org/apache/dubbo/rpc/service/EchoServiceDetector.java index 0d5afc3..fe3702a 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/KubernetesAdapter.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/service/EchoServiceDetector.java @@ -14,23 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.common.infra.support; +package org.apache.dubbo.rpc.service; -import org.apache.dubbo.common.extension.Activate; -import org.apache.dubbo.common.infra.InfraAdapter; +import org.apache.dubbo.rpc.model.BuiltinServiceDetector; -import java.util.Map; - -@Activate -public class KubernetesAdapter implements InfraAdapter { +public class EchoServiceDetector implements BuiltinServiceDetector { @Override - public Map<String, String> getExtraAttributes(Map<String, String> params) { - return null; + public Class<?> getService() { + return EchoService.class; } - @Override - public String getAttribute(String key) { - return null; - } } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/CmdbAdapter.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/service/GenericServiceDetector.java similarity index 66% copy from dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/CmdbAdapter.java copy to dubbo-common/src/main/java/org/apache/dubbo/rpc/service/GenericServiceDetector.java index 8b20895..29b950e 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/CmdbAdapter.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/service/GenericServiceDetector.java @@ -14,25 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.common.infra.support; +package org.apache.dubbo.rpc.service; -import org.apache.dubbo.common.infra.InfraAdapter; +import org.apache.dubbo.rpc.model.BuiltinServiceDetector; -import java.util.Map; - -public class CmdbAdapter implements InfraAdapter { - - public CmdbAdapter() { - // init; - } +public class GenericServiceDetector implements BuiltinServiceDetector { @Override - public Map<String, String> getExtraAttributes(Map<String, String> params) { - return null; + public Class<?> getService() { + return GenericService.class; } - @Override - public String getAttribute(String key) { - return null; - } } diff --git a/dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.model.BuiltinServiceDetector b/dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.model.BuiltinServiceDetector new file mode 100644 index 0000000..c5badd6 --- /dev/null +++ b/dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.model.BuiltinServiceDetector @@ -0,0 +1,2 @@ +echo=org.apache.dubbo.rpc.service.EchoServiceDetector +generic=org.apache.dubbo.rpc.service.GenericServiceDetector \ No newline at end of file diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/configcenter/DynamicConfigurationFactoryTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/config/configcenter/DynamicConfigurationFactoryTest.java index eacd3ee..8a4a1f8 100644 --- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/configcenter/DynamicConfigurationFactoryTest.java +++ b/dubbo-common/src/test/java/org/apache/dubbo/common/config/configcenter/DynamicConfigurationFactoryTest.java @@ -16,7 +16,7 @@ */ package org.apache.dubbo.common.config.configcenter; -import org.apache.dubbo.common.config.configcenter.file.FileSystemDynamicConfigurationFactory; +import org.apache.dubbo.common.config.configcenter.nop.NopDynamicConfigurationFactory; import org.junit.jupiter.api.Test; @@ -33,7 +33,7 @@ public class DynamicConfigurationFactoryTest { @Test public void testDefaultExtension() { DynamicConfigurationFactory factory = getExtensionLoader(DynamicConfigurationFactory.class).getDefaultExtension(); - assertEquals(FileSystemDynamicConfigurationFactory.class, factory.getClass()); - assertEquals(factory, getExtensionLoader(DynamicConfigurationFactory.class).getExtension("file")); + assertEquals(NopDynamicConfigurationFactory.class, factory.getClass()); + assertEquals(factory, getExtensionLoader(DynamicConfigurationFactory.class).getExtension("nop")); } } diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfigurationFactoryTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfigurationFactoryTest.java index ccd368d..37e37b1 100644 --- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfigurationFactoryTest.java +++ b/dubbo-common/src/test/java/org/apache/dubbo/common/config/configcenter/file/FileSystemDynamicConfigurationFactoryTest.java @@ -18,6 +18,7 @@ package org.apache.dubbo.common.config.configcenter.file; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.config.configcenter.DynamicConfigurationFactory; +import org.apache.dubbo.common.config.configcenter.nop.NopDynamicConfiguration; import org.junit.jupiter.api.Test; @@ -33,8 +34,8 @@ public class FileSystemDynamicConfigurationFactoryTest { @Test public void testGetFactory() { DynamicConfigurationFactory factory = DynamicConfigurationFactory.getDynamicConfigurationFactory("not-exists"); - assertEquals(factory, DynamicConfigurationFactory.getDynamicConfigurationFactory("file")); + assertEquals(factory, DynamicConfigurationFactory.getDynamicConfigurationFactory("nop")); assertEquals(factory.getDynamicConfiguration(URL.valueOf("dummy")), factory.getDynamicConfiguration(URL.valueOf("dummy"))); - assertEquals(FileSystemDynamicConfiguration.class, factory.getDynamicConfiguration(URL.valueOf("dummy")).getClass()); + assertEquals(NopDynamicConfiguration.class, factory.getDynamicConfiguration(URL.valueOf("dummy")).getClass()); } } diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml index 5e322c5..db45ba0 100644 --- a/dubbo-dependencies-bom/pom.xml +++ b/dubbo-dependencies-bom/pom.xml @@ -127,7 +127,7 @@ <rs_api_version>2.0</rs_api_version> <resteasy_version>3.0.19.Final</resteasy_version> <tomcat_embed_version>8.5.31</tomcat_embed_version> - <jetcd_version>0.3.0</jetcd_version> + <jetcd_version>0.4.1</jetcd_version> <nacos_version>1.1.1</nacos_version> <grpc.version>1.22.1</grpc.version> <!-- Log libs --> diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/CmdbAdapter.java b/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MetricsServiceDetector.java similarity index 66% copy from dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/CmdbAdapter.java copy to dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MetricsServiceDetector.java index 8b20895..3b9fa15 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/CmdbAdapter.java +++ b/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MetricsServiceDetector.java @@ -14,25 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.common.infra.support; +package org.apache.dubbo.monitor.support; -import org.apache.dubbo.common.infra.InfraAdapter; +import org.apache.dubbo.monitor.MetricsService; +import org.apache.dubbo.rpc.model.BuiltinServiceDetector; -import java.util.Map; - -public class CmdbAdapter implements InfraAdapter { - - public CmdbAdapter() { - // init; - } +public class MetricsServiceDetector implements BuiltinServiceDetector { @Override - public Map<String, String> getExtraAttributes(Map<String, String> params) { - return null; + public Class<?> getService() { + return MetricsService.class; } - @Override - public String getAttribute(String key) { - return null; - } } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/CmdbAdapter.java b/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorServiceDetector.java similarity index 66% copy from dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/CmdbAdapter.java copy to dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorServiceDetector.java index 8b20895..3ae4e10 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/infra/support/CmdbAdapter.java +++ b/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorServiceDetector.java @@ -14,25 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.common.infra.support; +package org.apache.dubbo.monitor.support; -import org.apache.dubbo.common.infra.InfraAdapter; +import org.apache.dubbo.monitor.MonitorService; +import org.apache.dubbo.rpc.model.BuiltinServiceDetector; -import java.util.Map; - -public class CmdbAdapter implements InfraAdapter { - - public CmdbAdapter() { - // init; - } +public class MonitorServiceDetector implements BuiltinServiceDetector { @Override - public Map<String, String> getExtraAttributes(Map<String, String> params) { - return null; + public Class<?> getService() { + return MonitorService.class; } - @Override - public String getAttribute(String key) { - return null; - } } diff --git a/dubbo-monitor/dubbo-monitor-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.model.BuiltinServiceDetector b/dubbo-monitor/dubbo-monitor-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.model.BuiltinServiceDetector new file mode 100644 index 0000000..ed8a80c --- /dev/null +++ b/dubbo-monitor/dubbo-monitor-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.model.BuiltinServiceDetector @@ -0,0 +1,2 @@ +monitor=org.apache.dubbo.monitor.support.MonitorServiceDetector +metrics=org.apache.dubbo.monitor.support.MetricsServiceDetector \ No newline at end of file diff --git a/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/DubboMonitorFactory.java b/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/DubboMonitorFactory.java index 94b058a..3ca881f 100644 --- a/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/DubboMonitorFactory.java +++ b/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/DubboMonitorFactory.java @@ -25,7 +25,6 @@ import org.apache.dubbo.monitor.support.AbstractMonitorFactory; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Protocol; import org.apache.dubbo.rpc.ProxyFactory; -import org.apache.dubbo.rpc.model.ApplicationModel; import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_PROTOCOL; import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY; @@ -64,7 +63,6 @@ public class DubboMonitorFactory extends AbstractMonitorFactory { } urlBuilder.addParameters(CHECK_KEY, String.valueOf(false), REFERENCE_FILTER_KEY, filter + "-monitor"); - ApplicationModel.getServiceRepository().registerService(MonitorService.class); Invoker<MonitorService> monitorInvoker = protocol.refer(MonitorService.class, urlBuilder.build()); MonitorService monitorService = proxyFactory.getProxy(monitorInvoker); return new DubboMonitor(monitorInvoker, monitorService); diff --git a/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/MetricsFilter.java b/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/MetricsFilter.java index d4f3f56..a0e58eb 100644 --- a/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/MetricsFilter.java +++ b/dubbo-monitor/dubbo-monitor-default/src/main/java/org/apache/dubbo/monitor/dubbo/MetricsFilter.java @@ -31,7 +31,6 @@ import org.apache.dubbo.rpc.Protocol; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcContext; import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.model.ApplicationModel; import org.apache.dubbo.rpc.support.RpcUtils; import com.alibaba.fastjson.JSON; @@ -89,7 +88,6 @@ public class MetricsFilter implements Filter { Invoker<MetricsService> metricsInvoker = initMetricsInvoker(); try { - ApplicationModel.getServiceRepository().registerService(MetricsService.class); protocol.export(metricsInvoker); } catch (RuntimeException e) { logger.error("Metrics Service need to be configured" + diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractChannel.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractChannel.java index 06e86a0..c993eec 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractChannel.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractChannel.java @@ -20,7 +20,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.RemotingException; -import org.apache.dubbo.remoting.utils.LogUtils; +import org.apache.dubbo.remoting.utils.PayloadDropper; /** * AbstractChannel @@ -35,7 +35,7 @@ public abstract class AbstractChannel extends AbstractPeer implements Channel { public void send(Object message, boolean sent) throws RemotingException { if (isClosed()) { throw new RemotingException(this, "Failed to send message " - + (message == null ? "" : message.getClass().getName()) + ":" + LogUtils.getRequestWithoutData(message) + + (message == null ? "" : message.getClass().getName()) + ":" + PayloadDropper.getRequestWithoutData(message) + ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress()); } } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java index 0241c29..8f6bf61 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java @@ -56,6 +56,8 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); + initExecutor(url); + try { doOpen(); } catch (Throwable t) { @@ -84,12 +86,15 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); } - executor = executorRepository.createExecutorIfAbsent(url); } - protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) { + private void initExecutor(URL url) { url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME); url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL); + executor = executorRepository.createExecutorIfAbsent(url); + } + + protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) { return ChannelHandlers.wrap(handler, url); } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java index a709a45..49d099d 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java @@ -131,7 +131,13 @@ public class WrappedChannelHandler implements ChannelHandlerDelegate { * @return */ public ExecutorService getSharedExecutorService() { - return ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().createExecutorIfAbsent(url); + ExecutorRepository executorRepository = + ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension(); + ExecutorService executor = executorRepository.getExecutor(url); + if (executor == null) { + executor = executorRepository.createExecutorIfAbsent(url); + } + return executor; } @Deprecated diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/utils/LogUtils.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/utils/PayloadDropper.java similarity index 93% rename from dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/utils/LogUtils.java rename to dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/utils/PayloadDropper.java index d35feb0..335ef7b 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/utils/LogUtils.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/utils/PayloadDropper.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.dubbo.remoting.utils; import org.apache.dubbo.common.logger.Logger; @@ -22,8 +21,8 @@ import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.remoting.exchange.Request; import org.apache.dubbo.remoting.exchange.Response; -public class LogUtils { - private static Logger logger = LoggerFactory.getLogger(LogUtils.class); +public class PayloadDropper { + private static Logger logger = LoggerFactory.getLogger(PayloadDropper.class); /** * only log body in debugger mode for size & security consideration. diff --git a/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyChannel.java b/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyChannel.java index 15e5f11..c13bc29 100644 --- a/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyChannel.java +++ b/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyChannel.java @@ -22,8 +22,8 @@ import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.RemotingException; import org.apache.dubbo.remoting.transport.AbstractChannel; +import org.apache.dubbo.remoting.utils.PayloadDropper; -import org.apache.dubbo.remoting.utils.LogUtils; import org.jboss.netty.channel.ChannelFuture; import java.net.InetSocketAddress; @@ -110,11 +110,11 @@ final class NettyChannel extends AbstractChannel { throw cause; } } catch (Throwable e) { - throw new RemotingException(this, "Failed to send message " + LogUtils.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); + throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { - throw new RemotingException(this, "Failed to send message " + LogUtils.getRequestWithoutData(message) + " to " + getRemoteAddress() + throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); } } diff --git a/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyServer.java b/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyServer.java index 775d2f6..0eac68d 100644 --- a/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyServer.java +++ b/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyServer.java @@ -62,7 +62,7 @@ public class NettyServer extends AbstractServer implements RemotingServer { private org.jboss.netty.channel.Channel channel; public NettyServer(URL url, ChannelHandler handler) throws RemotingException { - super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); + super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url)); } @Override diff --git a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java index 0448409..7e88e7c 100644 --- a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java +++ b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java @@ -82,7 +82,7 @@ public class HeartbeatHandlerTest { @Test public void testHeartbeat() throws Exception { - URL serverURL = URL.valueOf("header://localhost:55555?transporter=netty3"); + URL serverURL = URL.valueOf("header://localhost:55556?transporter=netty3"); serverURL = serverURL.addParameter(Constants.HEARTBEAT_KEY, 1000); TestHeartbeatHandler handler = new TestHeartbeatHandler(); server = Exchangers.bind(serverURL, handler); @@ -99,7 +99,7 @@ public class HeartbeatHandlerTest { @Test public void testClientHeartbeat() throws Exception { FakeChannelHandlers.setTestingChannelHandlers(); - URL serverURL = URL.valueOf("header://localhost:55555?transporter=netty3"); + URL serverURL = URL.valueOf("header://localhost:55557?transporter=netty3"); TestHeartbeatHandler handler = new TestHeartbeatHandler(); server = Exchangers.bind(serverURL, handler); System.out.println("Server bind successfully"); diff --git a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ThreadNameTest.java b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ThreadNameTest.java index 684d525..ae743ba 100644 --- a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ThreadNameTest.java +++ b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ThreadNameTest.java @@ -37,14 +37,17 @@ public class ThreadNameTest { private ThreadNameVerifyHandler serverHandler; private ThreadNameVerifyHandler clientHandler; + private static String serverRegex = "DubboServerHandler\\-localhost:(\\d+)\\-thread\\-(\\d+)"; + private static String clientRegex = "DubboClientHandler\\-localhost:(\\d+)\\-thread\\-(\\d+)"; + @BeforeEach public void before() throws Exception { int port = 55555; - serverURL = URL.valueOf("netty://localhost").setPort(port); - clientURL = URL.valueOf("netty://localhost").setPort(port); + serverURL = URL.valueOf("netty://localhost?side=provider").setPort(port); + clientURL = URL.valueOf("netty://localhost?side=consumer").setPort(port); - serverHandler = new ThreadNameVerifyHandler(String.valueOf(port), false); - clientHandler = new ThreadNameVerifyHandler(String.valueOf(port), true); + serverHandler = new ThreadNameVerifyHandler(serverRegex, false); + clientHandler = new ThreadNameVerifyHandler(clientRegex, true); server = new NettyServer(serverURL, serverHandler); client = new NettyClient(clientURL, clientHandler); @@ -89,7 +92,7 @@ public class ThreadNameTest { private void checkThreadName() { if (!success) { - success = Thread.currentThread().getName().contains(message); + success = Thread.currentThread().getName().matches(message); } } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java index 7630ef0..27b70d4 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java @@ -22,10 +22,10 @@ import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.RemotingException; import org.apache.dubbo.remoting.transport.AbstractChannel; +import org.apache.dubbo.remoting.utils.PayloadDropper; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import org.apache.dubbo.remoting.utils.LogUtils; import java.net.InetSocketAddress; import java.util.Map; @@ -171,10 +171,10 @@ final class NettyChannel extends AbstractChannel { } } catch (Throwable e) { removeChannelIfDisconnected(channel); - throw new RemotingException(this, "Failed to send message " + LogUtils.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); + throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { - throw new RemotingException(this, "Failed to send message " + LogUtils.getRequestWithoutData(message) + " to " + getRemoteAddress() + throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); } } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java index 0a00d35..8f19e4e 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java @@ -77,7 +77,7 @@ public class NettyServer extends AbstractServer implements RemotingServer { public NettyServer(URL url, ChannelHandler handler) throws RemotingException { // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants. // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler - super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); + super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url)); } /** diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java index 978acbe..3e5db8b 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java @@ -17,8 +17,13 @@ package org.apache.dubbo.rpc; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.utils.ReflectUtils; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.rpc.model.ApplicationModel; +import org.apache.dubbo.rpc.model.MethodDescriptor; +import org.apache.dubbo.rpc.model.ServiceDescriptor; +import org.apache.dubbo.rpc.model.ServiceRepository; +import org.apache.dubbo.rpc.support.RpcUtils; import java.io.Serializable; import java.lang.reflect.Method; @@ -26,6 +31,7 @@ import java.lang.reflect.Type; import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.stream.Stream; import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY; import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; @@ -125,19 +131,32 @@ public class RpcInvocation implements Invocation, Serializable { public RpcInvocation(String methodName, String serviceName, Class<?>[] parameterTypes, Object[] arguments, Map<String, Object> attachments, Invoker<?> invoker) { this.methodName = methodName; + this.serviceName = serviceName; this.parameterTypes = parameterTypes == null ? new Class<?>[0] : parameterTypes; this.arguments = arguments == null ? new Object[0] : arguments; this.attachments = attachments == null ? new HashMap<String, Object>() : attachments; this.invoker = invoker; + initParameterDesc(); + } + + private void initParameterDesc() { + ServiceRepository repository = ApplicationModel.getServiceRepository(); if (StringUtils.isNotEmpty(serviceName)) { - ApplicationModel.getServiceModel(serviceName).ifPresent(serviceModel -> - serviceModel.getMethod(methodName, parameterTypes) - .ifPresent(methodModel -> { - this.parameterTypesDesc = methodModel.getParamDesc(); - this.compatibleParamSignatures = methodModel.getCompatibleParamSignatures(); - this.returnTypes = methodModel.getReturnTypes(); - }) - ); + ServiceDescriptor serviceDescriptor = repository.lookupService(serviceName); + if (serviceDescriptor != null) { + MethodDescriptor methodDescriptor = serviceDescriptor.getMethod(methodName, parameterTypes); + if (methodDescriptor != null) { + this.parameterTypesDesc = methodDescriptor.getParamDesc(); + this.compatibleParamSignatures = methodDescriptor.getCompatibleParamSignatures(); + this.returnTypes = methodDescriptor.getReturnTypes(); + } + } + } + + if (parameterTypesDesc == null) { + this.parameterTypesDesc = ReflectUtils.getDesc(this.getParameterTypes()); + this.compatibleParamSignatures = Stream.of(this.parameterTypes).map(Class::getName).toArray(String[]::new); + this.returnTypes = RpcUtils.getReturnTypes(this); } } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/interceptors/ConsumerContextClusterInterceptor.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java similarity index 67% rename from dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/interceptors/ConsumerContextClusterInterceptor.java rename to dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java index 333878d..c935398 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/interceptors/ConsumerContextClusterInterceptor.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java @@ -1,63 +1,58 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.rpc.interceptors; - -import org.apache.dubbo.common.extension.Activate; -import org.apache.dubbo.common.utils.NetUtils; -import org.apache.dubbo.rpc.ClusterInterceptor; -import org.apache.dubbo.rpc.Invocation; -import org.apache.dubbo.rpc.Invoker; -import org.apache.dubbo.rpc.Result; -import org.apache.dubbo.rpc.RpcContext; -import org.apache.dubbo.rpc.RpcInvocation; - -import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_APPLICATION_KEY; -import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY; - -@Activate -public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener { - - @Override - public void before(Invoker<?> invoker, Invocation invocation) { - RpcContext.getContext() - .setInvoker(invoker) - .setInvocation(invocation) - .setLocalAddress(NetUtils.getHostAddress(), 0) - .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()) - .setRemoteApplicationName(invoker.getUrl().getParameter(REMOTE_APPLICATION_KEY)) - .setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getParameter(APPLICATION_KEY)); - if (invocation instanceof RpcInvocation) { - ((RpcInvocation) invocation).setInvoker(invoker); - } - RpcContext.removeServerContext(); - } - - @Override - public void after(Invoker<?> invoker, Invocation invocation) { - RpcContext.removeContext(); - } - - @Override - public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) { - RpcContext.getServerContext().setAttachments(appResponse.getAttachments()); - } - - @Override - public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) { - - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.filter; + +import org.apache.dubbo.common.extension.Activate; +import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.rpc.Filter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcContext; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.RpcInvocation; + +import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER; +import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_APPLICATION_KEY; + +/** + * ConsumerContextFilter set current RpcContext with invoker,invocation, local host, remote host and port + * for consumer invoker.It does it to make the requires info available to execution thread's RpcContext. + * + * @see org.apache.dubbo.rpc.Filter + * @see RpcContext + */ +@Activate(group = CONSUMER, order = -10000) +public class ConsumerContextFilter implements Filter { + + @Override + public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { + RpcContext.getContext() + .setInvoker(invoker) + .setInvocation(invocation) + .setLocalAddress(NetUtils.getLocalHost(), 0) + .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()) + .setRemoteApplicationName(invoker.getUrl().getParameter(REMOTE_APPLICATION_KEY)) + .setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getParameter(APPLICATION_KEY)); + if (invocation instanceof RpcInvocation) { + ((RpcInvocation) invocation).setInvoker(invoker); + } + return invoker.invoke(invocation); + } + +} diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ExceptionFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ExceptionFilter.java index c70e1b1..f0442ca 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ExceptionFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ExceptionFilter.java @@ -106,5 +106,10 @@ public class ExceptionFilter implements Filter, Filter.Listener { public void onError(Throwable e, Invoker<?> invoker, Invocation invocation) { logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e); } + + // For test purpose + public void setLogger(Logger logger) { + this.logger = logger; + } } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java index a8d8e45..ba221ed 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java @@ -90,6 +90,8 @@ public class ProtocolFilterWrapper implements Protocol { listener.onError(e, invoker, invocation); } throw e; + } finally { + } return asyncResult.whenCompleteWithContext((r, t) -> { if (filter instanceof ListenableFilter) {// Deprecated! diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/RpcUtils.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/RpcUtils.java index 7415928..e1d85b7 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/RpcUtils.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/RpcUtils.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong; import static org.apache.dubbo.common.constants.CommonConstants.$INVOKE; import static org.apache.dubbo.common.constants.CommonConstants.$INVOKE_ASYNC; import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_INVOCATION_PREFIX; +import static org.apache.dubbo.rpc.Constants.$ECHO; import static org.apache.dubbo.rpc.Constants.ASYNC_KEY; import static org.apache.dubbo.rpc.Constants.AUTO_ATTACH_INVOCATIONID_KEY; import static org.apache.dubbo.rpc.Constants.ID_KEY; @@ -191,6 +192,10 @@ public class RpcUtils { return $INVOKE.equals(method) || $INVOKE_ASYNC.equals(method); } + public static boolean isEcho(String path, String method) { + return $ECHO.equals(method); + } + public static InvokeMode getInvokeMode(URL url, Invocation inv) { if (isReturnTypeFuture(inv)) { return InvokeMode.FUTURE; diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.ClusterInterceptor b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.ClusterInterceptor deleted file mode 100644 index 2b60b1a..0000000 --- a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.ClusterInterceptor +++ /dev/null @@ -1,2 +0,0 @@ -consumer-context=org.apache.dubbo.rpc.interceptors.ConsumerContextClusterInterceptor -zone-aware=org.apache.dubbo.rpc.interceptors.ZoneAwareClusterInterceptor \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter index 1cc6181..376f966 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter +++ b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter @@ -6,6 +6,7 @@ accesslog=org.apache.dubbo.rpc.filter.AccessLogFilter activelimit=org.apache.dubbo.rpc.filter.ActiveLimitFilter classloader=org.apache.dubbo.rpc.filter.ClassLoaderFilter context=org.apache.dubbo.rpc.filter.ContextFilter +consumercontext=org.apache.dubbo.rpc.filter.ConsumerContextFilter exception=org.apache.dubbo.rpc.filter.ExceptionFilter executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java index 827fbdf..d47a90b 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java +++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java @@ -60,6 +60,7 @@ public class ExceptionFilterTest { exceptionFilter.invoke(invoker, invocation); } catch (RpcException e) { assertEquals("TestRpcException", e.getMessage()); + exceptionFilter.setLogger(logger); exceptionFilter.onError(e, invoker, invocation); } diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java index d72a878..c11df3b 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java @@ -33,14 +33,13 @@ import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.model.ApplicationModel; import org.apache.dubbo.rpc.model.MethodDescriptor; import org.apache.dubbo.rpc.model.ServiceDescriptor; -import org.apache.dubbo.rpc.support.RpcUtils; +import org.apache.dubbo.rpc.model.ServiceRepository; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import static org.apache.dubbo.common.URL.buildKey; import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY; @@ -117,18 +116,22 @@ public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Dec Object[] args = DubboCodec.EMPTY_OBJECT_ARRAY; Class<?>[] pts = DubboCodec.EMPTY_CLASS_ARRAY; if (desc.length() > 0) { - if (RpcUtils.isGenericCall(path, getMethodName())) { - pts = ReflectUtils.desc2classArray(desc); - } else { - Optional<ServiceDescriptor> serviceModel = ApplicationModel.getServiceModel(path); - if (serviceModel.isPresent()) { - Optional<MethodDescriptor> methodOptional = serviceModel.get().getMethod(getMethodName(), desc); - if (methodOptional.isPresent()) { - pts = methodOptional.get().getParameterClasses(); - this.setReturnTypes(methodOptional.get().getReturnTypes()); - } +// if (RpcUtils.isGenericCall(path, getMethodName()) || RpcUtils.isEcho(path, getMethodName())) { +// pts = ReflectUtils.desc2classArray(desc); +// } else { + ServiceRepository repository = ApplicationModel.getServiceRepository(); + ServiceDescriptor serviceDescriptor = repository.lookupService(path); + if (serviceDescriptor != null) { + MethodDescriptor methodDescriptor = serviceDescriptor.getMethod(getMethodName(), desc); + if (methodDescriptor != null) { + pts = methodDescriptor.getParameterClasses(); + this.setReturnTypes(methodDescriptor.getReturnTypes()); } } + if (pts == DubboCodec.EMPTY_CLASS_ARRAY) { + pts = ReflectUtils.desc2classArray(desc); + } +// } args = new Object[pts.length]; for (int i = 0; i < args.length; i++) { diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocolTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocolTest.java index b692fb1..16bceb1 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocolTest.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocolTest.java @@ -138,25 +138,23 @@ public class DubboProtocolTest { @Test public void testDubboProtocolMultiService() throws Exception { - DemoService service = new DemoServiceImpl(); - protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("dubbo://127.0.0.1:9010/" + DemoService.class.getName()))); - service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("dubbo://127.0.0.1:9010/" + DemoService.class.getName()).addParameter("timeout", - 3000L))); +// DemoService service = new DemoServiceImpl(); +// protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("dubbo://127.0.0.1:9010/" + DemoService.class.getName()))); +// service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("dubbo://127.0.0.1:9010/" + DemoService.class.getName()).addParameter("timeout", +// 3000L))); RemoteService remote = new RemoteServiceImpl(); protocol.export(proxy.getInvoker(remote, RemoteService.class, URL.valueOf("dubbo://127.0.0.1:9010/" + RemoteService.class.getName()))); remote = proxy.getProxy(protocol.refer(RemoteService.class, URL.valueOf("dubbo://127.0.0.1:9010/" + RemoteService.class.getName()).addParameter("timeout", 3000L))); - service.sayHello("world"); +// service.sayHello("world"); // test netty client - assertEquals("world", service.echo("world")); +// assertEquals("world", service.echo("world")); assertEquals("hello world@" + RemoteServiceImpl.class.getName(), remote.sayHello("world")); - EchoService serviceEcho = (EchoService) service; - assertEquals(serviceEcho.$echo("test"), "test"); - +// can't find target service addresses EchoService remoteEecho = (EchoService) remote; assertEquals(remoteEecho.$echo("ok"), "ok"); } diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/RpcFilterTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/RpcFilterTest.java index 9565247..28f5df9 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/RpcFilterTest.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/RpcFilterTest.java @@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.rpc.Protocol; import org.apache.dubbo.rpc.ProxyFactory; +import org.apache.dubbo.rpc.model.ApplicationModel; import org.apache.dubbo.rpc.protocol.dubbo.support.DemoService; import org.apache.dubbo.rpc.protocol.dubbo.support.DemoServiceImpl; import org.apache.dubbo.rpc.protocol.dubbo.support.ProtocolUtils; @@ -41,7 +42,8 @@ public class RpcFilterTest { @Test public void testRpcFilter() throws Exception { DemoService service = new DemoServiceImpl(); - URL url = URL.valueOf("dubbo://127.0.0.1:9010/org.apache.dubbo.rpc.DemoService?service.filter=echo"); + URL url = URL.valueOf("dubbo://127.0.0.1:9010/org.apache.dubbo.rpc.protocol.dubbo.support.DemoService?service.filter=echo"); + ApplicationModel.getServiceRepository().registerService(DemoService.class); protocol.export(proxy.getInvoker(service, DemoService.class, url)); service = proxy.getProxy(protocol.refer(DemoService.class, url)); Assertions.assertEquals("123", service.echo("123")); diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/support/EnumBak.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/support/EnumBak.java index 6ef0255..2af451f 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/support/EnumBak.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/support/EnumBak.java @@ -52,6 +52,7 @@ public class EnumBak { Invoker<DemoService> reference = protocol.refer(DemoService.class, consumerurl); DemoService demoProxy = (DemoService) proxy.getProxy(reference); // System.out.println(demoProxy.getThreadName()); + System.out.println(demoProxy.getbyte((byte) -128)); Assertions.assertEquals((byte) -128, demoProxy.getbyte((byte) -128)); // invoker.destroy(); diff --git a/pom.xml b/pom.xml index e65ca43..39de94b 100644 --- a/pom.xml +++ b/pom.xml @@ -533,6 +533,7 @@ <!-- exclude the edazdarevic files --> <exclude>**/org/apache/dubbo/common/utils/CIDRUtils.java</exclude> <exclude>.github/**</exclude> + <exclude>compiler/**</exclude> </excludes> </configuration> </execution>
