This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 72326f42ba Local references support mergeable (#9645)
72326f42ba is described below
commit 72326f42ba2add37b5f22541ad40fc3644dbe156
Author: huazhongming <[email protected]>
AuthorDate: Thu Sep 8 10:08:33 2022 +0800
Local references support mergeable (#9645)
Fix https://github.com/apache/dubbo/issues/8777
---
.../org/apache/dubbo/config/ReferenceConfig.java | 6 +--
.../apache/dubbo/config/ReferenceConfigTest.java | 7 ++-
.../dubbo/registry/multiple/MultipleRegistry.java | 2 +-
.../dubbo/rpc/protocol/injvm/InjvmInvoker.java | 9 ++--
.../dubbo/rpc/protocol/injvm/InjvmProtocol.java | 52 +++++++++++++++++++++-
.../rpc/protocol/injvm/Hello1ServiceImpl.java | 31 +++++++++++++
.../rpc/protocol/injvm/Hello2ServiceImpl.java | 33 ++++++++++++++
.../dubbo/rpc/protocol/injvm/HelloService.java | 24 ++++++++++
.../rpc/protocol/injvm/InjvmProtocolTest.java | 40 ++++++++++++++---
9 files changed, 181 insertions(+), 23 deletions(-)
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
index 6d5e3163a8..931adce942 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
@@ -448,11 +448,7 @@ public class ReferenceConfig<T> extends
ReferenceConfigBase<T> {
URL url = new ServiceConfigURL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0,
interfaceClass.getName(), referenceParameters);
url = url.setScopeModel(getScopeModel());
url = url.setServiceModel(consumerModel);
- Invoker<?> withFilter = protocolSPI.refer(interfaceClass, url);
- // Local Invoke ( Support Cluster Filter / Filter )
- List<Invoker<?>> invokers = new ArrayList<>();
- invokers.add(withFilter);
- invoker = Cluster.getCluster(url.getScopeModel(),
Cluster.DEFAULT).join(new StaticDirectory(url, invokers), true);
+ invoker = protocolSPI.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using in jvm service " + interfaceClass.getName());
diff --git
a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/ReferenceConfigTest.java
b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/ReferenceConfigTest.java
index 7d34883629..41943fc587 100644
---
a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/ReferenceConfigTest.java
+++
b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/ReferenceConfigTest.java
@@ -477,10 +477,9 @@ public class ReferenceConfigTest {
.initialize();
referenceConfig.init();
- Assertions.assertTrue(referenceConfig.getInvoker() instanceof
MockClusterInvoker);
- Invoker<?> withFilter = ((MockClusterInvoker<?>)
referenceConfig.getInvoker()).getDirectory().getAllInvokers().get(0);
- Assertions.assertTrue(withFilter instanceof ListenerInvokerWrapper);
- Assertions.assertTrue(((ListenerInvokerWrapper<?>)
withFilter).getInvoker() instanceof InjvmInvoker);
+ Invoker<?> withFilter = ((ListenerInvokerWrapper<?>)
referenceConfig.getInvoker()).getInvoker();
+ withFilter = ((MockClusterInvoker<?>)
withFilter).getDirectory().getAllInvokers().get(0);
+ Assertions.assertTrue(withFilter instanceof InjvmInvoker);
URL url = withFilter.getUrl();
Assertions.assertEquals("application1",
url.getParameter("application"));
Assertions.assertEquals("value1", url.getParameter("key1"));
diff --git
a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java
b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java
index 0b1e5d923e..be9c9c92ff 100644
---
a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java
+++
b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java
@@ -253,7 +253,7 @@ public class MultipleRegistry extends AbstractRegistry {
protected static class MultipleNotifyListenerWrapper implements
NotifyListener {
- Map<URL, SingleNotifyListener> registryMap = new
ConcurrentHashMap<URL, SingleNotifyListener>(4);
+ Map<URL, SingleNotifyListener> registryMap = new
ConcurrentHashMap<>(4);
NotifyListener sourceNotifyListener;
public MultipleNotifyListenerWrapper(NotifyListener
sourceNotifyListener) {
diff --git
a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
index 719e036d84..18c08e8236 100644
---
a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
@@ -42,7 +42,6 @@ import org.apache.dubbo.rpc.support.RpcUtils;
import java.lang.reflect.Type;
import java.util.HashMap;
-import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -63,7 +62,7 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
private final String key;
- private final Map<String, Exporter<?>> exporterMap;
+ private final Exporter<?> exporter;
private final ExecutorRepository executorRepository;
@@ -71,10 +70,10 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
private final boolean shouldIgnoreSameModule;
- InjvmInvoker(Class<T> type, URL url, String key, Map<String, Exporter<?>>
exporterMap) {
+ InjvmInvoker(Class<T> type, URL url, String key, Exporter<?> exporter) {
super(type, url);
this.key = key;
- this.exporterMap = exporterMap;
+ this.exporter = exporter;
this.executorRepository =
url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
this.paramDeepCopyUtil =
url.getOrDefaultFrameworkModel().getExtensionLoader(ParamDeepCopyUtil.class)
.getExtension(url.getParameter(CommonConstants.INJVM_COPY_UTIL_KEY,
DefaultParamDeepCopyUtil.NAME));
@@ -83,7 +82,6 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
@Override
public boolean isAvailable() {
- InjvmExporter<?> exporter = (InjvmExporter<?>) exporterMap.get(key);
if (exporter == null) {
return false;
} else {
@@ -93,7 +91,6 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
@Override
public Result doInvoke(Invocation invocation) throws Throwable {
- Exporter<?> exporter = InjvmProtocol.getExporter(exporterMap,
getUrl());
if (exporter == null) {
throw new RpcException("Service [" + key + "] not found.");
}
diff --git
a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocol.java
b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocol.java
index 75a51f37be..fdc9e101f6 100644
---
a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocol.java
@@ -18,19 +18,29 @@ package org.apache.dubbo.rpc.protocol.injvm;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.Cluster;
+import org.apache.dubbo.rpc.cluster.ClusterInvoker;
+import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
+import org.apache.dubbo.rpc.cluster.support.MergeableCluster;
import org.apache.dubbo.rpc.model.ScopeModel;
import org.apache.dubbo.rpc.protocol.AbstractProtocol;
import org.apache.dubbo.rpc.support.ProtocolUtils;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import static
org.apache.dubbo.common.constants.CommonConstants.BROADCAST_CLUSTER;
import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static
org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
import static org.apache.dubbo.rpc.Constants.GENERIC_KEY;
import static org.apache.dubbo.rpc.Constants.LOCAL_PROTOCOL;
import static org.apache.dubbo.rpc.Constants.SCOPE_KEY;
@@ -69,7 +79,7 @@ public class InjvmProtocol extends AbstractProtocol {
if (result == null) {
return null;
} else if (ProtocolUtils.isGeneric(
- result.getInvoker().getUrl().getParameter(GENERIC_KEY))) {
+ result.getInvoker().getUrl().getParameter(GENERIC_KEY))) {
return null;
} else {
return result;
@@ -88,7 +98,15 @@ public class InjvmProtocol extends AbstractProtocol {
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url)
throws RpcException {
- return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(),
exporterMap);
+ // group="a,b" or group="*"
+ String group = url.getParameter(GROUP_KEY);
+ if (StringUtils.isNotEmpty(group)) {
+ if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 ||
"*".equals(group)) {
+ return doCreateInvoker(url,
Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), serviceType);
+ }
+ }
+ Cluster cluster = Cluster.getCluster(url.getScopeModel(),
url.getParameter(CLUSTER_KEY));
+ return doCreateInvoker(url, cluster, serviceType);
}
public boolean isInjvmRefer(URL url) {
@@ -116,4 +134,34 @@ public class InjvmProtocol extends AbstractProtocol {
return false;
}
}
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ protected <T> ClusterInvoker<T> doCreateInvoker(URL url, Cluster cluster,
Class<T> type) {
+ StaticDirectory directory = new StaticDirectory(url,
getInvokers(exporterMap, url, type));
+ return (ClusterInvoker<T>) cluster.join(directory, true);
+ }
+
+ private <T> List<Invoker<T>> getInvokers(Map<String, Exporter<?>> map, URL
url, Class<T> type) {
+ List<Invoker<T>> result = new ArrayList<>();
+
+ if (!url.getServiceKey().contains("*")) {
+ Exporter<?> exporter = map.get(url.getServiceKey());
+ InjvmInvoker<T> invoker = new InjvmInvoker<>(type, url,
url.getServiceKey(), exporter);
+ result.add(invoker);
+ } else {
+ if (CollectionUtils.isNotEmptyMap(map)) {
+ for (Exporter<?> exporter : map.values()) {
+ if (UrlUtils.isServiceKeyMatch(url,
exporter.getInvoker().getUrl())) {
+ URL providerUrl = exporter.getInvoker().getUrl();
+ URL consumerUrl = url.addParameter(GROUP_KEY,
providerUrl.getGroup())
+ .addParameter(VERSION_KEY,
providerUrl.getVersion());
+ InjvmInvoker<T> invoker = new InjvmInvoker<>(type,
consumerUrl, consumerUrl.getServiceKey(), exporter);
+ result.add(invoker);
+ }
+ }
+ }
+ }
+
+ return result;
+ }
}
diff --git
a/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/Hello1ServiceImpl.java
b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/Hello1ServiceImpl.java
new file mode 100644
index 0000000000..ee5b7737e2
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/Hello1ServiceImpl.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.injvm;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Hello1ServiceImpl implements HelloService {
+
+ @Override
+ public List<String> hellos() {
+ List<String> res = new ArrayList<>();
+ res.add("consumer-hello-1");
+ return res;
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/Hello2ServiceImpl.java
b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/Hello2ServiceImpl.java
new file mode 100644
index 0000000000..8937f5096c
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/Hello2ServiceImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.injvm;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class Hello2ServiceImpl implements HelloService {
+
+ @Override
+ public List<String> hellos() {
+ List<String> res = new ArrayList<>();
+ res.add("consumer-hello-2");
+ return res;
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/HelloService.java
b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/HelloService.java
new file mode 100644
index 0000000000..cd6bc45c62
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/HelloService.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.injvm;
+
+import java.util.List;
+
+public interface HelloService {
+ List<String> hellos();
+}
diff --git
a/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocolTest.java
b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocolTest.java
index 4e57d054b4..d7dd373591 100644
---
a/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocolTest.java
+++
b/dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocolTest.java
@@ -29,18 +29,16 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.*;
import static org.apache.dubbo.rpc.Constants.ASYNC_KEY;
import static org.apache.dubbo.rpc.Constants.GENERIC_KEY;
import static org.apache.dubbo.rpc.Constants.LOCAL_PROTOCOL;
import static org.apache.dubbo.rpc.Constants.SCOPE_KEY;
import static org.apache.dubbo.rpc.Constants.SCOPE_LOCAL;
import static org.apache.dubbo.rpc.Constants.SCOPE_REMOTE;
+import static org.apache.dubbo.rpc.Constants.MERGER_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -76,7 +74,7 @@ public class InjvmProtocolTest {
assertEquals(service.getSize(new String[]{"", "", ""}), 3);
service.invoke("injvm://127.0.0.1/TestService", "invoke");
- InjvmInvoker<?> injvmInvoker = new InjvmInvoker<>(DemoService.class,
URL.valueOf("injvm://127.0.0.1/TestService"), null, new HashMap<>());
+ InjvmInvoker<?> injvmInvoker = new InjvmInvoker<>(DemoService.class,
URL.valueOf("injvm://127.0.0.1/TestService"), null, null);
assertFalse(injvmInvoker.isAvailable());
}
@@ -137,4 +135,36 @@ public class InjvmProtocolTest {
assertNull(service.getAsyncResult());
}
+ @Test
+ public void testLocalProtocolForMergeResult() throws Exception {
+ HelloService helloService1 = new Hello1ServiceImpl();
+ URL url = URL.valueOf("injvm://127.0.0.1/HelloService")
+ .addParameter(INTERFACE_KEY, HelloService.class.getName())
+ .addParameter(APPLICATION_KEY, "consumer")
+ .addParameter(GROUP_KEY, "g1");
+ Invoker<?> invoker1 = proxy.getInvoker(helloService1,
HelloService.class, url);
+ assertTrue(invoker1.isAvailable());
+ Exporter<?> exporter1 = protocol.export(invoker1);
+ exporters.add(exporter1);
+
+ URL url2 = URL.valueOf("injvm://127.0.0.1/HelloService")
+ .addParameter(INTERFACE_KEY, HelloService.class.getName())
+ .addParameter(APPLICATION_KEY, "consumer")
+ .addParameter(GROUP_KEY, "g2");
+ HelloService helloService2 = new Hello2ServiceImpl();
+ Invoker<?> invoker2 = proxy.getInvoker(helloService2,
HelloService.class, url2);
+ assertTrue(invoker2.isAvailable());
+ Exporter<?> exporter2 = protocol.export(invoker2);
+ exporters.add(exporter2);
+
+
+ URL referUrl = URL.valueOf("injvm://127.0.0.1/HelloService")
+ .addParameter(INTERFACE_KEY, HelloService.class.getName())
+ .addParameter(APPLICATION_KEY, "consumer")
+ .addParameter(GROUP_KEY, "*")
+ .addParameter(MERGER_KEY, "list");
+ List<String> list = proxy.getProxy(protocol.refer(HelloService.class,
referUrl)).hellos();
+ assertEquals(2, list.size());
+ }
+
}