This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new 8dec7d411d Fix wildcard match (#10256)
8dec7d411d is described below
commit 8dec7d411d0f045537516f3db2f916fd11486a72
Author: Albumen Kevin <[email protected]>
AuthorDate: Thu Jul 14 17:43:26 2022 +0800
Fix wildcard match (#10256)
---
.../apache/dubbo/common/ProtocolServiceKey.java | 124 ++++++++++++
.../java/org/apache/dubbo/common/ServiceKey.java | 146 +++++++++++++
.../common/ProtocolServiceKeyMatcherTest.java | 107 ++++++++++
.../dubbo/common/ProtocolServiceKeyTest.java | 76 +++++++
.../apache/dubbo/common/ServiceKeyMatcherTest.java | 224 ++++++++++++++++++++
.../org/apache/dubbo/common/ServiceKeyTest.java | 54 +++++
.../org/apache/dubbo/metadata/MetadataInfo.java | 29 ++-
.../registry/client/DefaultServiceInstance.java | 6 +
.../dubbo/registry/client/InstanceAddressURL.java | 21 ++
.../registry/client/ServiceDiscoveryRegistry.java | 8 +-
.../client/ServiceDiscoveryRegistryDirectory.java | 208 ++++++++++++++++---
.../listener/ServiceInstancesChangedListener.java | 213 ++++++++-----------
.../client/ServiceDiscoveryRegistryTest.java | 10 +-
.../MockServiceInstancesChangedListener.java | 5 +-
.../ServiceInstancesChangedListenerTest.java | 225 +++++++++++++++------
15 files changed, 1212 insertions(+), 244 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/ProtocolServiceKey.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/ProtocolServiceKey.java
new file mode 100644
index 0000000000..0cc8b203d4
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/ProtocolServiceKey.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common;
+
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.Objects;
+
+public class ProtocolServiceKey extends ServiceKey {
+ private final String protocol;
+
+ public ProtocolServiceKey(String interfaceName, String version, String
group, String protocol) {
+ super(interfaceName, version, group);
+ this.protocol = protocol;
+ }
+
+ public String getProtocol() {
+ return protocol;
+ }
+
+ public String getServiceKeyString() {
+ return super.toString();
+ }
+
+ public boolean isSameWith(ProtocolServiceKey protocolServiceKey) {
+ // interface version group should be the same
+ if (!super.equals(protocolServiceKey)) {
+ return false;
+ }
+
+ // origin protocol is *, can not match any protocol
+ if (CommonConstants.ANY_VALUE.equals(protocol)) {
+ return false;
+ }
+
+ // origin protocol is null, can match any protocol
+ if (StringUtils.isEmpty(protocol) ||
StringUtils.isEmpty(protocolServiceKey.getProtocol())) {
+ return true;
+ }
+
+ // origin protocol is not *, match itself
+ return Objects.equals(protocol, protocolServiceKey.getProtocol());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ ProtocolServiceKey that = (ProtocolServiceKey) o;
+ return Objects.equals(protocol, that.protocol);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), protocol);
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + CommonConstants.GROUP_CHAR_SEPARATOR +
protocol;
+ }
+
+ public static class Matcher {
+ public static boolean isMatch(ProtocolServiceKey rule,
ProtocolServiceKey target) {
+ // 1. 2. 3. match interface / version / group
+ if (!ServiceKey.Matcher.isMatch(rule, target)) {
+ return false;
+ }
+
+ // 4.match protocol
+ // 4.1. if rule group is *, match all
+ if (!CommonConstants.ANY_VALUE.equals(rule.getProtocol())) {
+ // 4.2. if rule protocol is null, match all
+ if (StringUtils.isNotEmpty(rule.getProtocol())) {
+ // 4.3. if rule protocol contains ',', split and match each
+ if
(rule.getProtocol().contains(CommonConstants.COMMA_SEPARATOR)) {
+ String[] protocols = rule.getProtocol().split("\\"
+CommonConstants.COMMA_SEPARATOR, -1);
+ boolean match = false;
+ for (String protocol : protocols) {
+ protocol = protocol.trim();
+ if (StringUtils.isEmpty(protocol) &&
StringUtils.isEmpty(target.getProtocol())) {
+ match = true;
+ break;
+ } else if (protocol.equals(target.getProtocol())) {
+ match = true;
+ break;
+ }
+ }
+ if (!match) {
+ return false;
+ }
+ }
+ // 4.3. if rule protocol is not contains ',', match
directly
+ else if (!Objects.equals(rule.getProtocol(),
target.getProtocol())) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }
+}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/ServiceKey.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/ServiceKey.java
new file mode 100644
index 0000000000..c8d8f846cc
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/ServiceKey.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common;
+
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.utils.StringUtils;
+
+import java.util.Objects;
+
+public class ServiceKey {
+ private final String interfaceName;
+ private final String group;
+ private final String version;
+
+ public ServiceKey(String interfaceName, String version, String group) {
+ this.interfaceName = interfaceName;
+ this.group = group;
+ this.version = version;
+ }
+
+ public String getInterfaceName() {
+ return interfaceName;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ServiceKey that = (ServiceKey) o;
+ return Objects.equals(interfaceName, that.interfaceName) &&
Objects.equals(group, that.group) && Objects.equals(version, that.version);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(interfaceName, group, version);
+ }
+
+ @Override
+ public String toString() {
+ return BaseServiceMetadata.buildServiceKey(interfaceName, group,
version);
+ }
+
+
+ public static class Matcher {
+ public static boolean isMatch(ServiceKey rule, ServiceKey target) {
+ // 1. match interface (accurate match)
+ if (!Objects.equals(rule.getInterfaceName(),
target.getInterfaceName())) {
+ return false;
+ }
+
+ // 2. match version (accurate match)
+ // 2.1. if rule version is *, match all
+ if (!CommonConstants.ANY_VALUE.equals(rule.getVersion())) {
+ // 2.2. if rule version is null, target version should be null
+ if (StringUtils.isEmpty(rule.getVersion()) &&
!StringUtils.isEmpty(target.getVersion())) {
+ return false;
+ }
+ if (!StringUtils.isEmpty(rule.getVersion())) {
+ // 2.3. if rule version contains ',', split and match each
+ if
(rule.getVersion().contains(CommonConstants.COMMA_SEPARATOR)) {
+ String[] versions = rule.getVersion().split("\\"
+CommonConstants.COMMA_SEPARATOR, -1);
+ boolean match = false;
+ for (String version : versions) {
+ version = version.trim();
+ if (StringUtils.isEmpty(version) &&
StringUtils.isEmpty(target.getVersion())) {
+ match = true;
+ break;
+ } else if (version.equals(target.getVersion())) {
+ match = true;
+ break;
+ }
+ }
+ if (!match) {
+ return false;
+ }
+ }
+ // 2.4. if rule version is not contains ',', match directly
+ else if (!Objects.equals(rule.getVersion(),
target.getVersion())) {
+ return false;
+ }
+ }
+ }
+
+ // 3. match group (wildcard match)
+ // 3.1. if rule group is *, match all
+ if (!CommonConstants.ANY_VALUE.equals(rule.getGroup())) {
+ // 3.2. if rule group is null, target group should be null
+ if (StringUtils.isEmpty(rule.getGroup()) &&
!StringUtils.isEmpty(target.getGroup())) {
+ return false;
+ }
+ if (!StringUtils.isEmpty(rule.getGroup())) {
+ // 3.3. if rule group contains ',', split and match each
+ if
(rule.getGroup().contains(CommonConstants.COMMA_SEPARATOR)) {
+ String[] groups = rule.getGroup().split("\\"
+CommonConstants.COMMA_SEPARATOR, -1);
+ boolean match = false;
+ for (String group : groups) {
+ group = group.trim();
+ if (StringUtils.isEmpty(group) &&
StringUtils.isEmpty(target.getGroup())) {
+ match = true;
+ break;
+ } else if (group.equals(target.getGroup())) {
+ match = true;
+ break;
+ }
+ }
+ if (!match) {
+ return false;
+ }
+ }
+ // 3.4. if rule group is not contains ',', match directly
+ else if (!Objects.equals(rule.getGroup(),
target.getGroup())) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+ }
+}
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/common/ProtocolServiceKeyMatcherTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/common/ProtocolServiceKeyMatcherTest.java
new file mode 100644
index 0000000000..635045c408
--- /dev/null
+++
b/dubbo-common/src/test/java/org/apache/dubbo/common/ProtocolServiceKeyMatcherTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ProtocolServiceKeyMatcherTest {
+
+ @Test
+ public void testProtocol() {
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo"),
+ new ProtocolServiceKey(null, null, null, "dubbo")
+ ));
+
+ Assertions.assertFalse(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo"),
+ new ProtocolServiceKey(null, null, null, null)
+ ));
+
+ Assertions.assertFalse(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo"),
+ new ProtocolServiceKey("DemoService", null, null, "dubbo")
+ ));
+
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, null),
+ new ProtocolServiceKey(null, null, null, "dubbo")
+ ));
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, ""),
+ new ProtocolServiceKey(null, null, null, "dubbo")
+ ));
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "*"),
+ new ProtocolServiceKey(null, null, null, "dubbo")
+ ));
+
+ Assertions.assertFalse(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,dubbo2"),
+ new ProtocolServiceKey(null, null, null, "dubbo")
+ ));
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,dubbo2"),
+ new ProtocolServiceKey(null, null, null, "dubbo1")
+ ));
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,dubbo2"),
+ new ProtocolServiceKey(null, null, null, "dubbo2")
+ ));
+
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,,dubbo2"),
+ new ProtocolServiceKey(null, null, null, null)
+ ));
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,,dubbo2"),
+ new ProtocolServiceKey(null, null, null, "")
+ ));
+
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, ",dubbo1,dubbo2"),
+ new ProtocolServiceKey(null, null, null, null)
+ ));
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, ",dubbo1,dubbo2"),
+ new ProtocolServiceKey(null, null, null, "")
+ ));
+
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,dubbo2,"),
+ new ProtocolServiceKey(null, null, null, null)
+ ));
+ Assertions.assertTrue(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,dubbo2,"),
+ new ProtocolServiceKey(null, null, null, "")
+ ));
+
+ Assertions.assertFalse(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,,dubbo2"),
+ new ProtocolServiceKey(null, null, null, "dubbo")
+ ));
+ Assertions.assertFalse(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, ",dubbo1,dubbo2"),
+ new ProtocolServiceKey(null, null, null, "dubbo")
+ ));
+ Assertions.assertFalse(ProtocolServiceKey.Matcher.isMatch(
+ new ProtocolServiceKey(null, null, null, "dubbo1,dubbo2,"),
+ new ProtocolServiceKey(null, null, null, "dubbo")
+ ));
+ }
+}
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/common/ProtocolServiceKeyTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/common/ProtocolServiceKeyTest.java
new file mode 100644
index 0000000000..09b2711aa2
--- /dev/null
+++
b/dubbo-common/src/test/java/org/apache/dubbo/common/ProtocolServiceKeyTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ProtocolServiceKeyTest {
+ @Test
+ public void test() {
+ ProtocolServiceKey protocolServiceKey = new
ProtocolServiceKey("DemoService", "1.0.0", "group1", "protocol1");
+ Assertions.assertEquals("DemoService",
protocolServiceKey.getInterfaceName());
+ Assertions.assertEquals("1.0.0", protocolServiceKey.getVersion());
+ Assertions.assertEquals("group1", protocolServiceKey.getGroup());
+ Assertions.assertEquals("protocol1", protocolServiceKey.getProtocol());
+
+ Assertions.assertEquals("group1/DemoService:1.0.0:protocol1",
protocolServiceKey.toString());
+ Assertions.assertEquals("group1/DemoService:1.0.0",
protocolServiceKey.getServiceKeyString());
+
+ Assertions.assertEquals(protocolServiceKey, protocolServiceKey);
+
+ ProtocolServiceKey protocolServiceKey1 = new
ProtocolServiceKey("DemoService", "1.0.0", "group1", "protocol1");
+ Assertions.assertEquals(protocolServiceKey, protocolServiceKey1);
+ Assertions.assertEquals(protocolServiceKey.hashCode(),
protocolServiceKey1.hashCode());
+
+ ProtocolServiceKey protocolServiceKey2 = new
ProtocolServiceKey("DemoService", "1.0.0", "group1", "protocol2");
+ Assertions.assertNotEquals(protocolServiceKey, protocolServiceKey2);
+
+ ProtocolServiceKey protocolServiceKey3 = new
ProtocolServiceKey("DemoService", "1.0.0", "group2", "protocol1");
+ Assertions.assertNotEquals(protocolServiceKey, protocolServiceKey3);
+
+ ProtocolServiceKey protocolServiceKey4 = new
ProtocolServiceKey("DemoService", "1.0.1", "group1", "protocol1");
+ Assertions.assertNotEquals(protocolServiceKey, protocolServiceKey4);
+
+ ProtocolServiceKey protocolServiceKey5 = new
ProtocolServiceKey("DemoInterface", "1.0.0", "group1", "protocol1");
+ Assertions.assertNotEquals(protocolServiceKey, protocolServiceKey5);
+
+ ServiceKey serviceKey = new ServiceKey("DemoService", "1.0.0",
"group1");
+ Assertions.assertNotEquals(protocolServiceKey, serviceKey);
+
+
Assertions.assertTrue(protocolServiceKey.isSameWith(protocolServiceKey));
+ Assertions.assertTrue(protocolServiceKey.isSameWith(new
ProtocolServiceKey("DemoService", "1.0.0", "group1", "")));
+ Assertions.assertTrue(protocolServiceKey.isSameWith(new
ProtocolServiceKey("DemoService", "1.0.0", "group1", null)));
+
+ Assertions.assertFalse(protocolServiceKey.isSameWith(new
ProtocolServiceKey("DemoService", "1.0.0", "group2", "protocol1")));
+ Assertions.assertFalse(protocolServiceKey.isSameWith(new
ProtocolServiceKey("DemoService", "1.0.0", "group2", "")));
+ Assertions.assertFalse(protocolServiceKey.isSameWith(new
ProtocolServiceKey("DemoService", "1.0.0", "group2", null)));
+
+
+ ProtocolServiceKey protocolServiceKey6 = new
ProtocolServiceKey("DemoService", "1.0.0", "group1", null);
+
Assertions.assertTrue(protocolServiceKey6.isSameWith(protocolServiceKey6));
+ Assertions.assertTrue(protocolServiceKey6.isSameWith(new
ProtocolServiceKey("DemoService", "1.0.0", "group1", "")));
+ Assertions.assertTrue(protocolServiceKey6.isSameWith(new
ProtocolServiceKey("DemoService", "1.0.0", "group1", "protocol1")));
+ Assertions.assertTrue(protocolServiceKey6.isSameWith(new
ProtocolServiceKey("DemoService", "1.0.0", "group1", "protocol2")));
+
+ ProtocolServiceKey protocolServiceKey7 = new
ProtocolServiceKey("DemoService", "1.0.0", "group1", "*");
+ Assertions.assertFalse(protocolServiceKey7.isSameWith(new
ProtocolServiceKey("DemoService", "1.0.0", "group1", null)));
+ Assertions.assertFalse(protocolServiceKey7.isSameWith(new
ProtocolServiceKey("DemoService", "1.0.0", "group1", "")));
+ Assertions.assertFalse(protocolServiceKey7.isSameWith(new
ProtocolServiceKey("DemoService", "1.0.0", "group1", "protocol1")));
+ Assertions.assertFalse(protocolServiceKey7.isSameWith(new
ProtocolServiceKey("DemoService", "1.0.0", "group1", "protocol2")));
+ }
+}
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/common/ServiceKeyMatcherTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/common/ServiceKeyMatcherTest.java
new file mode 100644
index 0000000000..6d85c46b31
--- /dev/null
+++
b/dubbo-common/src/test/java/org/apache/dubbo/common/ServiceKeyMatcherTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ServiceKeyMatcherTest {
+
+ @Test
+ public void testInterface() {
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey("DemoService", null, null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, null),
+ new ServiceKey("DemoService", null, null)
+ ));
+
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey("*", null, null),
+ new ServiceKey("DemoService", null, null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey("*", null, null),
+ new ServiceKey(null, null, null)
+ ));
+ }
+
+ @Test
+ public void testVersion() {
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0", null),
+ new ServiceKey(null, "1.0.0", null)
+ ));
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0", null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, null),
+ new ServiceKey(null, "1.0.0", null)
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,1.0.1", null),
+ new ServiceKey(null, "1.0.0", null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,1.0.1", null),
+ new ServiceKey(null, "1.0.2", null)
+ ));
+
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,1.0.1", null),
+ new ServiceKey(null, null, null)
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, ",1.0.0,1.0.1", null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, ",1.0.0,1.0.1", null),
+ new ServiceKey(null, "", null)
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,,1.0.1", null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,,1.0.1", null),
+ new ServiceKey(null, "", null)
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,1.0.1,", null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,1.0.1,", null),
+ new ServiceKey(null, "", null)
+ ));
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,1.0.1", null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "1.0.0,1.0.1", null),
+ new ServiceKey(null, "", null)
+ ));
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, ",1.0.0,1.0.1", null),
+ new ServiceKey(null, "1.0.2", null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, ",1.0.0,1.0.1", null),
+ new ServiceKey(null, "1.0.2", null)
+ ));
+
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "*", null),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "*", null),
+ new ServiceKey(null, "", null)
+ ));
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, "*", null),
+ new ServiceKey(null, "1.0.0", null)
+ ));
+ }
+
+ @Test
+ public void testGroup() {
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group1"),
+ new ServiceKey(null, null, "group1")
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group1"),
+ new ServiceKey(null, null, null)
+ ));
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, null),
+ new ServiceKey(null, null, "group1")
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group1, group2"),
+ new ServiceKey(null, null, "group1")
+ ));
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group2, group3"),
+ new ServiceKey(null, null, "group1")
+ ));
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group2, group3"),
+ new ServiceKey(null, null, null)
+ ));
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group2, group3"),
+ new ServiceKey(null, null, "")
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, ",group2"),
+ new ServiceKey(null, null, "")
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group2,"),
+ new ServiceKey(null, null, "")
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group2, ,group3"),
+ new ServiceKey(null, null, "")
+ ));
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, ",group2"),
+ new ServiceKey(null, null, "group1")
+ ));
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group2,"),
+ new ServiceKey(null, null, "group1")
+ ));
+
+ Assertions.assertFalse(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "group2, ,group3"),
+ new ServiceKey(null, null, "group1")
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "*"),
+ new ServiceKey(null, null, "")
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "*"),
+ new ServiceKey(null, null, null)
+ ));
+
+ Assertions.assertTrue(ServiceKey.Matcher.isMatch(
+ new ServiceKey(null, null, "*"),
+ new ServiceKey(null, null, "group1")
+ ));
+ }
+}
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/common/ServiceKeyTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/common/ServiceKeyTest.java
new file mode 100644
index 0000000000..5643166ffa
--- /dev/null
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/ServiceKeyTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ServiceKeyTest {
+ @Test
+ public void test() {
+ ServiceKey serviceKey = new ServiceKey("DemoService", "1.0.0",
"group1");
+
+ Assertions.assertEquals("DemoService", serviceKey.getInterfaceName());
+ Assertions.assertEquals("1.0.0", serviceKey.getVersion());
+ Assertions.assertEquals("group1", serviceKey.getGroup());
+
+ Assertions.assertEquals("group1/DemoService:1.0.0",
serviceKey.toString());
+ Assertions.assertEquals("DemoService", new ServiceKey("DemoService",
null, null).toString());
+ Assertions.assertEquals("DemoService:1.0.0", new
ServiceKey("DemoService", "1.0.0", null).toString());
+ Assertions.assertEquals("group1/DemoService", new
ServiceKey("DemoService", null, "group1").toString());
+
+ Assertions.assertEquals(serviceKey, serviceKey);
+
+ ServiceKey serviceKey1 = new ServiceKey("DemoService", "1.0.0",
"group1");
+ Assertions.assertEquals(serviceKey, serviceKey1);
+ Assertions.assertEquals(serviceKey.hashCode(), serviceKey1.hashCode());
+
+ ServiceKey serviceKey2 = new ServiceKey("DemoService", "1.0.0",
"group2");
+ Assertions.assertNotEquals(serviceKey, serviceKey2);
+
+ ServiceKey serviceKey3 = new ServiceKey("DemoService", "1.0.1",
"group1");
+ Assertions.assertNotEquals(serviceKey, serviceKey3);
+
+ ServiceKey serviceKey4 = new ServiceKey("DemoInterface", "1.0.0",
"group1");
+ Assertions.assertNotEquals(serviceKey, serviceKey4);
+
+ ProtocolServiceKey protocolServiceKey = new
ProtocolServiceKey("DemoService", "1.0.0", "group1", "protocol1");
+ Assertions.assertNotEquals(serviceKey, protocolServiceKey);
+ }
+}
diff --git
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index 2735b931ee..0465fbb228 100644
---
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.metadata;
+import org.apache.dubbo.common.ProtocolServiceKey;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
@@ -437,6 +438,7 @@ public class MetadataInfo implements Serializable {
private String group;
private String version;
private String protocol;
+ private int port = -1;
private String path; // most of the time, path is the same with the
interface name.
private Map<String, String> params;
@@ -453,12 +455,14 @@ public class MetadataInfo implements Serializable {
// service + group + version + protocol
private volatile transient String matchKey;
+ private volatile transient ProtocolServiceKey protocolServiceKey;
+
private transient URL url;
public ServiceInfo() {}
public ServiceInfo(URL url, List<MetadataParamsFilter> filters) {
- this(url.getServiceInterface(), url.getGroup(), url.getVersion(),
url.getProtocol(), url.getPath(), null);
+ this(url.getServiceInterface(), url.getGroup(), url.getVersion(),
url.getProtocol(), url.getPort(), url.getPath(), null);
this.url = url;
Map<String, String> params = extractServiceParams(url, filters);
// initialize method params caches.
@@ -466,11 +470,12 @@ public class MetadataInfo implements Serializable {
this.consumerMethodParams =
URLParam.initMethodParameters(consumerParams);
}
- public ServiceInfo(String name, String group, String version, String
protocol, String path, Map<String, String> params) {
+ public ServiceInfo(String name, String group, String version, String
protocol, int port, String path, Map<String, String> params) {
this.name = name;
this.group = group;
this.version = version;
this.protocol = protocol;
+ this.port = port;
this.path = path;
this.params = params == null ? new ConcurrentHashMap<>() : params;
@@ -577,6 +582,14 @@ public class MetadataInfo implements Serializable {
return matchKey;
}
+ public ProtocolServiceKey getProtocolServiceKey() {
+ if (protocolServiceKey != null) {
+ return protocolServiceKey;
+ }
+ protocolServiceKey = new ProtocolServiceKey(name, version, group,
protocol);
+ return protocolServiceKey;
+ }
+
private String buildServiceKey(String name, String group, String
version) {
this.serviceKey = URL.buildKey(name, group, version);
return this.serviceKey;
@@ -630,6 +643,14 @@ public class MetadataInfo implements Serializable {
this.protocol = protocol;
}
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
public Map<String, String> getParams() {
if (params == null) {
return Collections.emptyMap();
@@ -759,12 +780,13 @@ public class MetadataInfo implements Serializable {
&& Objects.equals(this.getGroup(), serviceInfo.getGroup())
&& Objects.equals(this.getName(), serviceInfo.getName())
&& Objects.equals(this.getProtocol(),
serviceInfo.getProtocol())
+ && Objects.equals(this.getPort(), serviceInfo.getPort())
&& this.getParams().equals(serviceInfo.getParams());
}
@Override
public int hashCode() {
- return Objects.hash(getVersion(), getGroup(), getName(),
getProtocol(), getParams());
+ return Objects.hash(getVersion(), getGroup(), getName(),
getProtocol(), getPort(), getParams());
}
@Override
@@ -778,6 +800,7 @@ public class MetadataInfo implements Serializable {
"group='" + group + "'," +
"version='" + version + "'," +
"protocol='" + protocol + "'," +
+ "port='" + port + "'," +
"params=" + params + "," +
"}";
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
index 074811beed..432a77738a 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
@@ -245,6 +245,12 @@ public class DefaultServiceInstance implements
ServiceInstance {
return copyOfInstance;
}
+ public DefaultServiceInstance copyFrom(int port) {
+ DefaultServiceInstance copyOfInstance = new
DefaultServiceInstance(this);
+ copyOfInstance.setPort(port);
+ return copyOfInstance;
+ }
+
@Override
public Map<String, String> getAllParams() {
if (extendParams == null) {
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
index 8e8b2f9424..225b6e6e2e 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.registry.client;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.url.component.ServiceConfigURL;
import org.apache.dubbo.common.url.component.URLAddress;
import org.apache.dubbo.common.url.component.URLParam;
import org.apache.dubbo.common.utils.CollectionUtils;
@@ -126,6 +127,26 @@ public class InstanceAddressURL extends URL {
return RpcContext.getServiceContext().getServiceKey();
}
+ @Override
+ public URL setProtocol(String protocol) {
+ return new ServiceConfigURL(protocol, getUsername(), getPassword(),
getHost(), getPort(), getPath(), getParameters(), attributes);
+ }
+
+ @Override
+ public URL setHost(String host) {
+ return new ServiceConfigURL(getProtocol(), getUsername(),
getPassword(), host, getPort(), getPath(), getParameters(), attributes);
+ }
+
+ @Override
+ public URL setPort(int port) {
+ return new ServiceConfigURL(getProtocol(), getUsername(),
getPassword(), getHost(), port, getPath(), getParameters(), attributes);
+ }
+
+ @Override
+ public URL setPath(String path) {
+ return new ServiceConfigURL(getProtocol(), getUsername(),
getPassword(), getHost(), getPort(), path, getParameters(), attributes);
+ }
+
@Override
public String getAddress() {
return instance.getAddress();
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index 30eb983edd..a1aaf01cfe 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -299,8 +299,8 @@ public class ServiceDiscoveryRegistry extends
FailbackRegistry {
protected void subscribeURLs(URL url, NotifyListener listener, Set<String>
serviceNames) {
serviceNames = toTreeSet(serviceNames);
String serviceNamesKey = toStringKeys(serviceNames);
- String protocolServiceKey = url.getProtocolServiceKey();
- logger.info(String.format("Trying to subscribe from apps %s for
service key %s, ", serviceNamesKey, protocolServiceKey));
+ String serviceKey = url.getServiceKey();
+ logger.info(String.format("Trying to subscribe from apps %s for
service key %s, ", serviceNamesKey, serviceKey));
// register ServiceInstancesChangedListener
Lock appSubscriptionLock = getAppSubscription(serviceNamesKey);
@@ -322,7 +322,7 @@ public class ServiceDiscoveryRegistry extends
FailbackRegistry {
if (!serviceInstancesChangedListener.isDestroyed()) {
serviceInstancesChangedListener.setUrl(url);
listener.addServiceListener(serviceInstancesChangedListener);
-
serviceInstancesChangedListener.addListenerAndNotify(protocolServiceKey,
listener);
+ serviceInstancesChangedListener.addListenerAndNotify(url,
listener);
serviceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener);
} else {
logger.info(String.format("Listener of %s has been destroyed
by another thread.", serviceNamesKey));
@@ -398,7 +398,7 @@ public class ServiceDiscoveryRegistry extends
FailbackRegistry {
Lock appSubscriptionLock =
getAppSubscription(appKey);
try {
appSubscriptionLock.lock();
-
oldListener.removeListener(url.getProtocolServiceKey(), listener);
+
oldListener.removeListener(url.getServiceKey(), listener);
if (!oldListener.hasListeners()) {
oldListener.destroy();
removeAppSubscriptionLock(appKey);
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
index bfe1b8b383..7bb611e3d3 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
@@ -16,8 +16,10 @@
*/
package org.apache.dubbo.registry.client;
+import org.apache.dubbo.common.ProtocolServiceKey;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
+import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
@@ -30,26 +32,35 @@ import org.apache.dubbo.registry.Constants;
import org.apache.dubbo.registry.ProviderFirstParams;
import org.apache.dubbo.registry.integration.AbstractConfiguratorListener;
import org.apache.dubbo.registry.integration.DynamicDirectory;
+import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
+import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcServiceContext;
import org.apache.dubbo.rpc.cluster.Configurator;
import org.apache.dubbo.rpc.cluster.RouterChain;
+import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
import org.apache.dubbo.rpc.cluster.router.state.BitList;
import org.apache.dubbo.rpc.model.ModuleModel;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import static org.apache.dubbo.common.constants.CommonConstants.DISABLED_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.ENABLED_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
import static
org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_HASHMAP_LOAD_FACTOR;
import static
org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static
org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_TYPE_KEY;
@@ -64,13 +75,15 @@ public class ServiceDiscoveryRegistryDirectory<T> extends
DynamicDirectory<T> {
* instance address to invoker mapping.
* The initial value is null and the midway may be assigned to null,
please use the local variable reference
*/
- private volatile Map<String, Invoker<T>> urlInvokerMap;
+ private volatile Map<ProtocolServiceKeyWithAddress, Invoker<T>>
urlInvokerMap;
private volatile ReferenceConfigurationListener
referenceConfigurationListener;
private volatile boolean enableConfigurationListen = true;
private volatile List<URL> originalUrls = null;
private volatile Map<String, String> overrideQueryMap;
private final Set<String> providerFirstParams;
private final ModuleModel moduleModel;
+ private final ProtocolServiceKey consumerProtocolServiceKey;
+ private final Map<ProtocolServiceKey, URL> customizedConsumerUrlMap = new
ConcurrentHashMap<>();
public ServiceDiscoveryRegistryDirectory(Class<T> serviceType, URL url) {
super(serviceType, url);
@@ -94,6 +107,9 @@ public class ServiceDiscoveryRegistryDirectory<T> extends
DynamicDirectory<T> {
}
}
+ String protocol = consumerUrl.getParameter(PROTOCOL_KEY,
consumerUrl.getProtocol());
+ consumerProtocolServiceKey = new
ProtocolServiceKey(consumerUrl.getServiceInterface(), consumerUrl.getVersion(),
consumerUrl.getGroup(),
+ !CommonConstants.CONSUMER.equals(protocol) ? protocol : null);
}
@Override
@@ -224,15 +240,15 @@ public class ServiceDiscoveryRegistryDirectory<T> extends
DynamicDirectory<T> {
}
// use local reference to avoid NPE as this.urlInvokerMap will be
set null concurrently at destroyAllInvokers().
- Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
+ Map<ProtocolServiceKeyWithAddress, Invoker<T>> localUrlInvokerMap
= this.urlInvokerMap;
// can't use local reference as oldUrlInvokerMap's mappings might
be removed directly at toInvokers().
- Map<String, Invoker<T>> oldUrlInvokerMap = null;
+ Map<ProtocolServiceKeyWithAddress, Invoker<T>> oldUrlInvokerMap =
null;
if (localUrlInvokerMap != null) {
// the initial capacity should be set greater than the maximum
number of entries divided by the load factor to avoid resizing.
oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 +
localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));
localUrlInvokerMap.forEach(oldUrlInvokerMap::put);
}
- Map<String, Invoker<T>> newUrlInvokerMap =
toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map
+ Map<ProtocolServiceKeyWithAddress, Invoker<T>> newUrlInvokerMap =
toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map
logger.info("Refreshed invoker size " + newUrlInvokerMap.size());
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
@@ -266,11 +282,12 @@ public class ServiceDiscoveryRegistryDirectory<T> extends
DynamicDirectory<T> {
* @param urls
* @return invokers
*/
- private Map<String, Invoker<T>> toInvokers(Map<String, Invoker<T>>
oldUrlInvokerMap, List<URL> urls) {
- Map<String, Invoker<T>> newUrlInvokerMap = new
ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));
+ private Map<ProtocolServiceKeyWithAddress, Invoker<T>>
toInvokers(Map<ProtocolServiceKeyWithAddress, Invoker<T>> oldUrlInvokerMap,
List<URL> urls) {
+ Map<ProtocolServiceKeyWithAddress, Invoker<T>> newUrlInvokerMap = new
ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
+
for (URL url : urls) {
InstanceAddressURL instanceAddressURL = (InstanceAddressURL) url;
if (EMPTY_PROTOCOL.equals(instanceAddressURL.getProtocol())) {
@@ -291,33 +308,59 @@ public class ServiceDiscoveryRegistryDirectory<T> extends
DynamicDirectory<T> {
instanceAddressURL =
overrideWithConfigurator(instanceAddressURL);
}
- Invoker<T> invoker = oldUrlInvokerMap == null ? null :
oldUrlInvokerMap.get(instanceAddressURL.getAddress());
- if (invoker == null || urlChanged(invoker, instanceAddressURL)) {
// Not in the cache, refer again
- try {
- boolean enabled = true;
- if (instanceAddressURL.hasParameter(DISABLED_KEY)) {
- enabled =
!instanceAddressURL.getParameter(DISABLED_KEY, false);
- } else {
- enabled = instanceAddressURL.getParameter(ENABLED_KEY,
true);
+ // filter all the service available (version wildcard, group
wildcard, protocol wildcard)
+ int port = instanceAddressURL.getPort();
+ List<ProtocolServiceKey> matchedProtocolServiceKeys =
instanceAddressURL.getMetadataInfo()
+ .getServices().values()
+ .stream()
+ .filter(serviceInfo -> serviceInfo.getPort() <= 0 ||
serviceInfo.getPort() == port)
+ .map(MetadataInfo.ServiceInfo::getProtocolServiceKey)
+ .filter(key ->
ProtocolServiceKey.Matcher.isMatch(consumerProtocolServiceKey, key))
+ .collect(Collectors.toList());
+
+ // see org.apache.dubbo.common.ProtocolServiceKey.isSameWith
+ // check if needed to override the consumer url
+ boolean shouldWrap = matchedProtocolServiceKeys.size() != 1 ||
!consumerProtocolServiceKey.isSameWith(matchedProtocolServiceKeys.get(0));
+
+ for (ProtocolServiceKey matchedProtocolServiceKey :
matchedProtocolServiceKeys) {
+ ProtocolServiceKeyWithAddress protocolServiceKeyWithAddress =
new ProtocolServiceKeyWithAddress(matchedProtocolServiceKey,
instanceAddressURL.getAddress());
+ Invoker<T> invoker = oldUrlInvokerMap == null ? null :
oldUrlInvokerMap.get(protocolServiceKeyWithAddress);
+ if (invoker == null || urlChanged(invoker, instanceAddressURL,
matchedProtocolServiceKey)) { // Not in the cache, refer again
+ try {
+ boolean enabled;
+ if (instanceAddressURL.hasParameter(DISABLED_KEY)) {
+ enabled =
!instanceAddressURL.getParameter(DISABLED_KEY, false);
+ } else {
+ enabled =
instanceAddressURL.getParameter(ENABLED_KEY, true);
+ }
+ if (enabled) {
+ if (shouldWrap) {
+ URL newConsumerUrl =
customizedConsumerUrlMap.computeIfAbsent(matchedProtocolServiceKey,
+ k ->
consumerUrl.setProtocol(k.getProtocol())
+
.addParameter(CommonConstants.GROUP_KEY, k.getGroup())
+
.addParameter(CommonConstants.VERSION_KEY, k.getVersion()));
+
RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl);
+ invoker = new
InstanceWrappedInvoker<>(protocol.refer(serviceType, instanceAddressURL),
newConsumerUrl, matchedProtocolServiceKey);
+ } else {
+ invoker = protocol.refer(serviceType,
instanceAddressURL);
+ }
+ }
+ } catch (Throwable t) {
+ logger.error("Failed to refer invoker for interface:"
+ serviceType + ",url:(" + instanceAddressURL + ")" + t.getMessage(), t);
}
- if (enabled) {
- invoker = protocol.refer(serviceType,
instanceAddressURL);
+ if (invoker != null) { // Put new invoker in cache
+ newUrlInvokerMap.put(protocolServiceKeyWithAddress,
invoker);
}
- } catch (Throwable t) {
- logger.error("Failed to refer invoker for interface:" +
serviceType + ",url:(" + instanceAddressURL + ")" + t.getMessage(), t);
+ } else {
+ newUrlInvokerMap.put(protocolServiceKeyWithAddress,
invoker);
+ oldUrlInvokerMap.remove(protocolServiceKeyWithAddress,
invoker);
}
- if (invoker != null) { // Put new invoker in cache
- newUrlInvokerMap.put(instanceAddressURL.getAddress(),
invoker);
- }
- } else {
- newUrlInvokerMap.put(instanceAddressURL.getAddress(), invoker);
- oldUrlInvokerMap.remove(instanceAddressURL.getAddress(),
invoker);
}
}
return newUrlInvokerMap;
}
- private boolean urlChanged(Invoker<T> invoker, InstanceAddressURL newURL) {
+ private boolean urlChanged(Invoker<T> invoker, InstanceAddressURL newURL,
ProtocolServiceKey protocolServiceKey) {
InstanceAddressURL oldURL = (InstanceAddressURL) invoker.getUrl();
if (!newURL.getInstance().equals(oldURL.getInstance())) {
@@ -335,16 +378,35 @@ public class ServiceDiscoveryRegistryDirectory<T> extends
DynamicDirectory<T> {
}
}
- MetadataInfo.ServiceInfo oldServiceInfo =
oldURL.getMetadataInfo().getValidServiceInfo(getConsumerUrl().getProtocolServiceKey());
+ MetadataInfo.ServiceInfo oldServiceInfo =
oldURL.getMetadataInfo().getValidServiceInfo(protocolServiceKey.toString());
if (null == oldServiceInfo) {
return false;
}
- return
!oldServiceInfo.equals(newURL.getMetadataInfo().getValidServiceInfo(getConsumerUrl().getProtocolServiceKey()));
+ return
!oldServiceInfo.equals(newURL.getMetadataInfo().getValidServiceInfo(protocolServiceKey.toString()));
}
private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) {
- return invokers;
+ List<Invoker<T>> mergedInvokers = new ArrayList<>();
+ Map<String, List<Invoker<T>>> groupMap = new HashMap<>();
+ for (Invoker<T> invoker : invokers) {
+ String group = invoker.getUrl().getGroup("");
+ groupMap.computeIfAbsent(group, k -> new ArrayList<>());
+ groupMap.get(group).add(invoker);
+ }
+
+ if (groupMap.size() == 1) {
+ mergedInvokers.addAll(groupMap.values().iterator().next());
+ } else if (groupMap.size() > 1) {
+ for (List<Invoker<T>> groupList : groupMap.values()) {
+ StaticDirectory<T> staticDirectory = new
StaticDirectory<>(groupList);
+ staticDirectory.buildRouterChain();
+ mergedInvokers.add(cluster.join(staticDirectory, false));
+ }
+ } else {
+ mergedInvokers = invokers;
+ }
+ return mergedInvokers;
}
/**
@@ -352,7 +414,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends
DynamicDirectory<T> {
*/
@Override
protected void destroyAllInvokers() {
- Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; //
local reference
+ Map<ProtocolServiceKeyWithAddress, Invoker<T>> localUrlInvokerMap =
this.urlInvokerMap; // local reference
if (localUrlInvokerMap != null) {
for (Invoker<T> invoker : new
ArrayList<>(localUrlInvokerMap.values())) {
try {
@@ -375,7 +437,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends
DynamicDirectory<T> {
* @param oldUrlInvokerMap
* @param newUrlInvokerMap
*/
- private void destroyUnusedInvokers(Map<String, Invoker<T>>
oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
+ private void destroyUnusedInvokers(Map<ProtocolServiceKeyWithAddress,
Invoker<T>> oldUrlInvokerMap, Map<ProtocolServiceKeyWithAddress, Invoker<T>>
newUrlInvokerMap) {
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
destroyAllInvokers();
return;
@@ -385,7 +447,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends
DynamicDirectory<T> {
return;
}
- for (Map.Entry<String, Invoker<T>> entry :
oldUrlInvokerMap.entrySet()) {
+ for (Map.Entry<ProtocolServiceKeyWithAddress, Invoker<T>> entry :
oldUrlInvokerMap.entrySet()) {
Invoker<T> invoker = entry.getValue();
if (invoker != null) {
try {
@@ -461,4 +523,88 @@ public class ServiceDiscoveryRegistryDirectory<T> extends
DynamicDirectory<T> {
});
}
}
+
+ public static final class ProtocolServiceKeyWithAddress extends
ProtocolServiceKey {
+ private final String address;
+
+ public ProtocolServiceKeyWithAddress(ProtocolServiceKey
protocolServiceKey, String address) {
+ super(protocolServiceKey.getInterfaceName(),
protocolServiceKey.getVersion(), protocolServiceKey.getGroup(),
protocolServiceKey.getProtocol());
+ this.address = address;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ ProtocolServiceKeyWithAddress that =
(ProtocolServiceKeyWithAddress) o;
+ return Objects.equals(address, that.address);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), address);
+ }
+ }
+
+ public static final class InstanceWrappedInvoker<T> implements Invoker<T> {
+ private final Invoker<T> originInvoker;
+ private final URL newConsumerUrl;
+ private final ProtocolServiceKey protocolServiceKey;
+
+ public InstanceWrappedInvoker(Invoker<T> originInvoker, URL
newConsumerUrl, ProtocolServiceKey protocolServiceKey) {
+ this.originInvoker = originInvoker;
+ this.newConsumerUrl = newConsumerUrl;
+ this.protocolServiceKey = protocolServiceKey;
+ }
+
+ @Override
+ public Class<T> getInterface() {
+ return originInvoker.getInterface();
+ }
+
+ @Override
+ public Result invoke(Invocation invocation) throws RpcException {
+ // override consumer url with real protocol service key
+ RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl);
+ // recreate invocation due to the protocol service key changed
+ RpcInvocation copiedInvocation = new
RpcInvocation(invocation.getTargetServiceUniqueName(),
+ invocation.getServiceModel(), invocation.getMethodName(),
invocation.getServiceName(), protocolServiceKey.toString(),
+ invocation.getParameterTypes(), invocation.getArguments(),
invocation.getObjectAttachments(),
+ invocation.getInvoker(), invocation.getAttributes(),
+ invocation instanceof RpcInvocation ? ((RpcInvocation)
invocation).getInvokeMode() : null);
+ copiedInvocation.setObjectAttachment(CommonConstants.GROUP_KEY,
protocolServiceKey.getGroup());
+ copiedInvocation.setObjectAttachment(CommonConstants.VERSION_KEY,
protocolServiceKey.getVersion());
+ return originInvoker.invoke(copiedInvocation);
+ }
+
+ @Override
+ public URL getUrl() {
+ RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl);
+ return originInvoker.getUrl();
+ }
+
+ @Override
+ public boolean isAvailable() {
+ RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl);
+ return originInvoker.isAvailable();
+ }
+
+ @Override
+ public void destroy() {
+ RpcContext.getServiceContext().setConsumerUrl(newConsumerUrl);
+ originInvoker.destroy();
+ }
+ }
+
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 486fd90868..0ec60af5ae 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -16,14 +16,15 @@
*/
package org.apache.dubbo.registry.client.event.listener;
+import org.apache.dubbo.common.ProtocolServiceKey;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
+import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
-import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.metadata.MetadataInfo.ServiceInfo;
import org.apache.dubbo.registry.NotifyListener;
@@ -38,7 +39,6 @@ import org.apache.dubbo.rpc.model.ScopeModelUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -52,8 +52,6 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import static java.util.Collections.emptySet;
-import static
org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPARATOR;
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
import static
org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static
org.apache.dubbo.common.constants.RegistryConstants.ENABLE_EMPTY_PROTECTION_KEY;
@@ -77,7 +75,7 @@ public class ServiceInstancesChangedListener {
protected AtomicBoolean destroyed = new AtomicBoolean(false);
protected Map<String, List<ServiceInstance>> allInstances;
- protected Map<String, Object> serviceUrls;
+ protected Map<String, List<ProtocolServiceKeyWithUrls>> serviceUrls;
private volatile long lastRefreshTime;
private final Semaphore retryPermission;
@@ -96,8 +94,7 @@ public class ServiceInstancesChangedListener {
this.allInstances = new HashMap<>();
this.serviceUrls = new HashMap<>();
retryPermission = new Semaphore(1);
- this.scheduler = ScopeModelUtil.getApplicationModel(serviceDiscovery
== null || serviceDiscovery.getUrl() == null ? null :
serviceDiscovery.getUrl().getScopeModel())
-
.getBeanFactory().getBean(FrameworkExecutorRepository.class).getMetadataRetryExecutor();
+ this.scheduler = ScopeModelUtil.getApplicationModel(serviceDiscovery
== null || serviceDiscovery.getUrl() == null ? null :
serviceDiscovery.getUrl().getScopeModel()).getBeanFactory().getBean(FrameworkExecutorRepository.class).getMetadataRetryExecutor();
}
/**
@@ -127,7 +124,7 @@ public class ServiceInstancesChangedListener {
}
Map<String, List<ServiceInstance>> revisionToInstances = new
HashMap<>();
- Map<String, Map<String, Set<String>>> localServiceToRevisions = new
HashMap<>();
+ Map<ServiceInfo, Set<String>> localServiceToRevisions = new
HashMap<>();
// grouping all instances of this app(service name) by revision
for (Map.Entry<String, List<ServiceInstance>> entry :
allInstances.entrySet()) {
@@ -179,55 +176,45 @@ public class ServiceInstancesChangedListener {
}
hasEmptyMetadata = false;
- Map<String, Map<Set<String>, Object>> protocolRevisionsToUrls = new
HashMap<>();
- Map<String, Object> newServiceUrls = new HashMap<>();
- for (Map.Entry<String, Map<String, Set<String>>> entry :
localServiceToRevisions.entrySet()) {
- String protocol = entry.getKey();
- entry.getValue().forEach((protocolServiceKey, revisions) -> {
- Map<Set<String>, Object> revisionsToUrls =
protocolRevisionsToUrls.computeIfAbsent(protocol, k -> new HashMap<>());
- Object urls = revisionsToUrls.get(revisions);
- if (urls == null) {
- urls = getServiceUrlsCache(revisionToInstances, revisions,
protocol);
- revisionsToUrls.put(revisions, urls);
- }
+ Map<String, Map<Integer, Map<Set<String>, Object>>>
protocolRevisionsToUrls = new HashMap<>();
+ Map<String, List<ProtocolServiceKeyWithUrls>> newServiceUrls = new
HashMap<>();
+ for (Map.Entry<ServiceInfo, Set<String>> entry :
localServiceToRevisions.entrySet()) {
+ ServiceInfo serviceInfo = entry.getKey();
+ Set<String> revisions = entry.getValue();
+
+ Map<Integer, Map<Set<String>, Object>> portToRevisions =
protocolRevisionsToUrls.computeIfAbsent(serviceInfo.getProtocol(), k -> new
HashMap<>());
+ Map<Set<String>, Object> revisionsToUrls =
portToRevisions.computeIfAbsent(serviceInfo.getPort(), k -> new HashMap<>());
+ Object urls = revisionsToUrls.get(revisions);
+ if (urls == null) {
+ urls = getServiceUrlsCache(revisionToInstances, revisions,
serviceInfo.getProtocol(), serviceInfo.getPort());
+ revisionsToUrls.put(revisions, urls);
+ }
- newServiceUrls.put(protocolServiceKey, urls);
- });
+ List<ProtocolServiceKeyWithUrls> list =
newServiceUrls.computeIfAbsent(serviceInfo.getPath(), k -> new LinkedList<>());
+ list.add(new
ProtocolServiceKeyWithUrls(serviceInfo.getProtocolServiceKey(), (List<URL>)
urls));
}
this.serviceUrls = newServiceUrls;
this.notifyAddressChanged();
}
- public synchronized void addListenerAndNotify(String serviceKey,
NotifyListener listener) {
+ public synchronized void addListenerAndNotify(URL url, NotifyListener
listener) {
if (destroyed.get()) {
return;
}
- Set<NotifyListenerWithKey> notifyListeners =
this.listeners.computeIfAbsent(serviceKey, _k -> new ConcurrentHashSet<>());
- // {@code protocolServiceKeysToConsume} will be specific protocols
configured in reference config or default protocols supported by framework.
- Set<String> protocolServiceKeysToConsume =
getProtocolServiceKeyList(serviceKey, listener);
- // Add current listener to serviceKey set, there will have more than
one listener when multiple references of one same service is configured.
- NotifyListenerWithKey listenerWithKey = new
NotifyListenerWithKey(serviceKey, protocolServiceKeysToConsume, listener);
+ Set<NotifyListenerWithKey> notifyListeners =
this.listeners.computeIfAbsent(url.getServiceKey(), _k -> new
ConcurrentHashSet<>());
+ String protocol = listener.getConsumerUrl().getParameter(PROTOCOL_KEY,
url.getProtocol());
+ ProtocolServiceKey protocolServiceKey = new
ProtocolServiceKey(url.getServiceInterface(), url.getVersion(), url.getGroup(),
+ !CommonConstants.CONSUMER.equals(protocol) ? protocol : null);
+ NotifyListenerWithKey listenerWithKey = new
NotifyListenerWithKey(protocolServiceKey, listener);
notifyListeners.add(listenerWithKey);
// Aggregate address and notify on subscription.
- List<URL> urls;
- if (protocolServiceKeysToConsume.size() > 1) {
- urls = new ArrayList<>();
- for (String protocolServiceKey : protocolServiceKeysToConsume) {
- List<URL> urlsOfProtocol = getAddresses(protocolServiceKey,
listener.getConsumerUrl());
- if (CollectionUtils.isNotEmpty(urlsOfProtocol)) {
- logger.info(String.format("Found %s urls of protocol
service key %s ", urlsOfProtocol.size(), protocolServiceKey));
- urls.addAll(urlsOfProtocol);
- }
- }
- } else {
- urls =
getAddresses(protocolServiceKeysToConsume.iterator().next(),
listener.getConsumerUrl());
- }
+ List<URL> urls = getAddresses(protocolServiceKey,
listener.getConsumerUrl());
if (CollectionUtils.isNotEmpty(urls)) {
- logger.info(String.format("Notify serviceKey: %s, listener: %s
with %s urls on subscription", serviceKey, listener, urls.size()));
+ logger.info(String.format("Notify serviceKey: %s, listener: %s
with %s urls on subscription", protocolServiceKey, listener, urls.size()));
listener.notify(urls);
}
}
@@ -240,9 +227,7 @@ public class ServiceInstancesChangedListener {
// synchronized method, no need to use DCL
Set<NotifyListenerWithKey> notifyListeners =
this.listeners.get(serviceKey);
if (notifyListeners != null) {
- NotifyListenerWithKey listenerWithKey = new
NotifyListenerWithKey(serviceKey, notifyListener);
- // Remove from global listeners
- notifyListeners.remove(listenerWithKey);
+ notifyListeners.removeIf(listener ->
listener.getNotifyListener().equals(notifyListener));
// ServiceKey has no listener, remove set
if (notifyListeners.size() == 0) {
@@ -341,23 +326,28 @@ public class ServiceInstancesChangedListener {
return emptyMetadataNum;
}
- protected Map<String, Map<String, Set<String>>> parseMetadata(String
revision, MetadataInfo metadata, Map<String, Map<String, Set<String>>>
localServiceToRevisions) {
+ protected Map<ServiceInfo, Set<String>> parseMetadata(String revision,
MetadataInfo metadata, Map<ServiceInfo, Set<String>> localServiceToRevisions) {
Map<String, ServiceInfo> serviceInfos = metadata.getServices();
for (Map.Entry<String, ServiceInfo> entry : serviceInfos.entrySet()) {
- String protocol = entry.getValue().getProtocol();
- String protocolServiceKey = entry.getValue().getMatchKey();
- Map<String, Set<String>> map =
localServiceToRevisions.computeIfAbsent(protocol, _p -> new HashMap<>());
- Set<String> set = map.computeIfAbsent(protocolServiceKey, _k ->
new TreeSet<>());
+ Set<String> set =
localServiceToRevisions.computeIfAbsent(entry.getValue(), _k -> new
TreeSet<>());
set.add(revision);
}
return localServiceToRevisions;
}
- protected Object getServiceUrlsCache(Map<String, List<ServiceInstance>>
revisionToInstances, Set<String> revisions, String protocol) {
+ protected Object getServiceUrlsCache(Map<String, List<ServiceInstance>>
revisionToInstances, Set<String> revisions, String protocol, int port) {
List<URL> urls = new ArrayList<>();
for (String r : revisions) {
for (ServiceInstance i : revisionToInstances.get(r)) {
+ if (port > 0) {
+ if (i.getPort() == port) {
+
urls.add(i.toURL(protocol).setScopeModel(i.getApplicationModel()));
+ } else {
+ urls.add(((DefaultServiceInstance)
i).copyFrom(port).toURL(protocol).setScopeModel(i.getApplicationModel()));
+ }
+ continue;
+ }
// different protocols may have ports specified in meta
if (ServiceInstanceMetadataUtils.hasEndpoints(i)) {
DefaultServiceInstance.Endpoint endpoint =
ServiceInstanceMetadataUtils.getEndpoint(i, protocol);
@@ -372,8 +362,17 @@ public class ServiceInstancesChangedListener {
return urls;
}
- protected List<URL> getAddresses(String serviceProtocolKey, URL
consumerURL) {
- return (List<URL>) serviceUrls.get(serviceProtocolKey);
+ protected List<URL> getAddresses(ProtocolServiceKey protocolServiceKey,
URL consumerURL) {
+ List<ProtocolServiceKeyWithUrls> protocolServiceKeyWithUrlsList =
serviceUrls.get(protocolServiceKey.getInterfaceName());
+ List<URL> urls = new ArrayList<>();
+ if (protocolServiceKeyWithUrlsList != null) {
+ for (ProtocolServiceKeyWithUrls protocolServiceKeyWithUrls :
protocolServiceKeyWithUrlsList) {
+ if (ProtocolServiceKey.Matcher.isMatch(protocolServiceKey,
protocolServiceKeyWithUrls.getProtocolServiceKey())) {
+ urls.addAll(protocolServiceKeyWithUrls.getUrls());
+ }
+ }
+ }
+ return urls;
}
/**
@@ -385,28 +384,10 @@ public class ServiceInstancesChangedListener {
// 2 multiple subscription listener of the same service
for (NotifyListenerWithKey listenerWithKey : listenerSet) {
NotifyListener notifyListener =
listenerWithKey.getNotifyListener();
- if (listenerWithKey.getProtocolServiceKeys().size() == 1) {//
2.1 if one specific protocol is specified
- String protocolServiceKey =
listenerWithKey.getProtocolServiceKeys().iterator().next();
- //FIXME, group wildcard match
- List<URL> urls =
toUrlsWithEmpty(getAddresses(protocolServiceKey,
notifyListener.getConsumerUrl()));
- logger.info("Notify service " + protocolServiceKey + "
with urls " + urls.size());
- notifyListener.notify(urls);
- } else {// 2.2 multiple protocols or no protocol(using default
protocols) set
- List<URL> urls = new ArrayList<>();
- int effectiveProtocolNum = 0;
- for (String protocolServiceKey :
listenerWithKey.getProtocolServiceKeys()) {
- List<URL> tmpUrls = getAddresses(protocolServiceKey,
notifyListener.getConsumerUrl());
- if (CollectionUtils.isNotEmpty(tmpUrls)) {
- logger.info("Found " + urls.size() + " urls of
protocol service key " + protocolServiceKey);
- effectiveProtocolNum++;
- urls.addAll(tmpUrls);
- }
- }
- logger.info("Notify service " + serviceKey + " with " +
urls.size() + " urls from " + effectiveProtocolNum + " different protocols");
- urls = toUrlsWithEmpty(urls);
- notifyListener.notify(urls);
- }
+ List<URL> urls =
toUrlsWithEmpty(getAddresses(listenerWithKey.getProtocolServiceKey(),
notifyListener.getConsumerUrl()));
+ logger.info("Notify service " +
listenerWithKey.getProtocolServiceKey() + " with urls " + urls.size());
+ notifyListener.notify(urls);
}
});
}
@@ -421,9 +402,7 @@ public class ServiceInstancesChangedListener {
if (CollectionUtils.isEmpty(urls) && !emptyProtectionEnabled) {
// notice that the service of this.url may not be the same as
notify listener.
- URL empty = URLBuilder.from(this.url)
- .setProtocol(EMPTY_PROTOCOL)
- .build();
+ URL empty =
URLBuilder.from(this.url).setProtocol(EMPTY_PROTOCOL).build();
urls.add(empty);
}
return urls;
@@ -468,46 +447,6 @@ public class ServiceInstancesChangedListener {
return Objects.hash(getClass(), getServiceNames());
}
- /**
- * Calculate the protocol list that the consumer cares about.
- *
- * @param serviceKey possible input serviceKey includes
- * 1. {group}/{interface}:{version}, if protocol is not
specified
- * 2. {group}/{interface}:{version}:{user specified
protocols}
- * @param listener listener also contains the user specified protocols
- * @return protocol list with the format
{group}/{interface}:{version}:{protocol}
- */
- protected Set<String> getProtocolServiceKeyList(String serviceKey,
NotifyListener listener) {
- if (StringUtils.isEmpty(serviceKey)) {
- return emptySet();
- }
-
- Set<String> result = new HashSet<>();
- String protocol = listener.getConsumerUrl().getParameter(PROTOCOL_KEY);
- if (serviceKey.endsWith(CONSUMER_PROTOCOL_SUFFIX)) {
- serviceKey = serviceKey.substring(0,
serviceKey.indexOf(CONSUMER_PROTOCOL_SUFFIX));
- }
-
- if (StringUtils.isNotEmpty(protocol)) {
- int protocolIndex = serviceKey.indexOf(":" + protocol);
- if (protocol.contains(",") && protocolIndex != -1) {
- serviceKey = serviceKey.substring(0, protocolIndex);
- String[] specifiedProtocols = protocol.split(",");
- for (String specifiedProtocol : specifiedProtocols) {
- result.add(serviceKey + GROUP_CHAR_SEPARATOR +
specifiedProtocol);
- }
- } else {
- result.add(serviceKey);
- }
- } else {
- for (String supportedProtocol : SUPPORTED_PROTOCOLS) {
- result.add(serviceKey + GROUP_CHAR_SEPARATOR +
supportedProtocol);
- }
- }
-
- return result;
- }
-
protected class AddressRefreshRetryTask implements Runnable {
private final RetryServiceInstancesChangedEvent retryEvent;
private final Semaphore retryPermission;
@@ -525,26 +464,16 @@ public class ServiceInstancesChangedListener {
}
public static class NotifyListenerWithKey {
- private final String serviceKey;
- private final Set<String> protocolServiceKeys;
+ private final ProtocolServiceKey protocolServiceKey;
private final NotifyListener notifyListener;
- public NotifyListenerWithKey(String protocolServiceKey, Set<String>
protocolServiceKeys, NotifyListener notifyListener) {
- this.serviceKey = protocolServiceKey;
- this.protocolServiceKeys = (protocolServiceKeys == null ? new
ConcurrentHashSet<>() : protocolServiceKeys);
+ public NotifyListenerWithKey(ProtocolServiceKey protocolServiceKey,
NotifyListener notifyListener) {
+ this.protocolServiceKey = protocolServiceKey;
this.notifyListener = notifyListener;
}
- public NotifyListenerWithKey(String protocolServiceKey, NotifyListener
notifyListener) {
- this(protocolServiceKey, null, notifyListener);
- }
-
- public String getServiceKey() {
- return serviceKey;
- }
-
- public Set<String> getProtocolServiceKeys() {
- return protocolServiceKeys;
+ public ProtocolServiceKey getProtocolServiceKey() {
+ return protocolServiceKey;
}
public NotifyListener getNotifyListener() {
@@ -560,12 +489,30 @@ public class ServiceInstancesChangedListener {
return false;
}
NotifyListenerWithKey that = (NotifyListenerWithKey) o;
- return Objects.equals(serviceKey, that.serviceKey) &&
Objects.equals(notifyListener, that.notifyListener);
+ return Objects.equals(protocolServiceKey, that.protocolServiceKey)
&& Objects.equals(notifyListener, that.notifyListener);
}
@Override
public int hashCode() {
- return Objects.hash(serviceKey, notifyListener);
+ return Objects.hash(protocolServiceKey, notifyListener);
+ }
+ }
+
+ public static class ProtocolServiceKeyWithUrls {
+ private final ProtocolServiceKey protocolServiceKey;
+ private final List<URL> urls;
+
+ public ProtocolServiceKeyWithUrls(ProtocolServiceKey
protocolServiceKey, List<URL> urls) {
+ this.protocolServiceKey = protocolServiceKey;
+ this.urls = urls;
+ }
+
+ public ProtocolServiceKey getProtocolServiceKey() {
+ return protocolServiceKey;
+ }
+
+ public List<URL> getUrls() {
+ return urls;
}
}
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryTest.java
b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryTest.java
index 6ce0410b43..40546c73c2 100644
---
a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryTest.java
+++
b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.registry.client;
+import org.apache.dubbo.common.ProtocolServiceKey;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.metadata.AbstractServiceNameMapping;
import org.apache.dubbo.metadata.ServiceNameMapping;
@@ -43,7 +44,6 @@ import java.util.concurrent.locks.ReentrantLock;
import static org.apache.dubbo.common.constants.CommonConstants.CHECK_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
-import static
org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPARATOR;
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
import static org.apache.dubbo.metadata.ServiceNameMapping.toStringKeys;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -220,10 +220,10 @@ public class ServiceDiscoveryRegistryTest {
// check different protocol
Map<String,
Set<ServiceInstancesChangedListener.NotifyListenerWithKey>> serviceListeners =
multiAppsInstanceListener.getServiceListeners();
assertEquals(2, serviceListeners.size());
- assertEquals(1,
serviceListeners.get(url.getProtocolServiceKey()).size());
- assertEquals(1,
serviceListeners.get(url2.getProtocolServiceKey()).size());
- String protocolServiceKey = url2.getServiceKey() +
GROUP_CHAR_SEPARATOR + url2.getParameter(PROTOCOL_KEY, DUBBO);
-
assertTrue(serviceListeners.get(url2.getProtocolServiceKey()).contains(new
ServiceInstancesChangedListener.NotifyListenerWithKey(protocolServiceKey,
testServiceListener2)));
+ assertEquals(1, serviceListeners.get(url.getServiceKey()).size());
+ assertEquals(1, serviceListeners.get(url2.getServiceKey()).size());
+ ProtocolServiceKey protocolServiceKey = new
ProtocolServiceKey(url2.getServiceInterface(), url2.getVersion(),
url2.getGroup(), url2.getParameter(PROTOCOL_KEY, DUBBO));
+ assertTrue(serviceListeners.get(url2.getServiceKey()).contains(new
ServiceInstancesChangedListener.NotifyListenerWithKey(protocolServiceKey,
testServiceListener2)));
}
/**
diff --git
a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/MockServiceInstancesChangedListener.java
b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/MockServiceInstancesChangedListener.java
index 7c187eeef1..26f82b4f5b 100644
---
a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/MockServiceInstancesChangedListener.java
+++
b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/MockServiceInstancesChangedListener.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.registry.client.event.listener;
+import org.apache.dubbo.common.ProtocolServiceKey;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
@@ -35,8 +36,8 @@ public class MockServiceInstancesChangedListener extends
ServiceInstancesChanged
}
@Override
- public List<URL> getAddresses(String serviceProtocolKey, URL consumerURL) {
- return super.getAddresses(serviceProtocolKey, consumerURL);
+ public List<URL> getAddresses(ProtocolServiceKey protocolServiceKey, URL
consumerURL) {
+ return super.getAddresses(protocolServiceKey, consumerURL);
}
public Map<String, Set<NotifyListenerWithKey>> getServiceListeners() {
diff --git
a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
index b8c69614c1..7bab0b8554 100644
---
a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
+++
b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.registry.client.event.listener;
+import org.apache.dubbo.common.ProtocolServiceKey;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.JsonUtils;
import org.apache.dubbo.common.utils.LRUCache;
@@ -49,7 +50,6 @@ import org.mockito.stubbing.Answer;
import java.lang.reflect.Field;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -62,7 +62,6 @@ import static
org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
import static org.apache.dubbo.common.utils.CollectionUtils.isNotEmpty;
import static
org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.EXPORTED_SERVICES_REVISION_PROPERTY_NAME;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.eq;
@@ -219,7 +218,8 @@ public class ServiceInstancesChangedListenerTest {
Assertions.assertEquals(1, allInstances.size());
Assertions.assertEquals(3, allInstances.get("app1").size());
- List<URL> serviceUrls = listener.getAddresses(service1 + ":dubbo",
consumerURL);
+ ProtocolServiceKey protocolServiceKey = new
ProtocolServiceKey(service1, null, null, "dubbo");
+ List<URL> serviceUrls = listener.getAddresses(protocolServiceKey,
consumerURL);
Assertions.assertEquals(3, serviceUrls.size());
assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
}
@@ -240,7 +240,8 @@ public class ServiceInstancesChangedListenerTest {
Assertions.assertEquals(1, allInstances.size());
Assertions.assertEquals(3, allInstances.get("app1").size());
- List<URL> serviceUrls = listener.getAddresses(service1 + ":dubbo",
consumerURL);
+ ProtocolServiceKey protocolServiceKey = new
ProtocolServiceKey(service1, null, null, "dubbo");
+ List<URL> serviceUrls = listener.getAddresses(protocolServiceKey,
consumerURL);
Assertions.assertEquals(3, serviceUrls.size());
assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
@@ -271,12 +272,16 @@ public class ServiceInstancesChangedListenerTest {
Assertions.assertEquals(3, allInstances.get("app1").size());
Assertions.assertEquals(4, allInstances.get("app2").size());
- List<URL> serviceUrls = listener.getAddresses(service1 + ":dubbo",
consumerURL);
+ ProtocolServiceKey protocolServiceKey1 = new
ProtocolServiceKey(service1, null, null, "dubbo");
+ ProtocolServiceKey protocolServiceKey2 = new
ProtocolServiceKey(service2, null, null, "dubbo");
+ ProtocolServiceKey protocolServiceKey3 = new
ProtocolServiceKey(service3, null, null, "dubbo");
+
+ List<URL> serviceUrls = listener.getAddresses(protocolServiceKey1,
consumerURL);
Assertions.assertEquals(7, serviceUrls.size());
- List<URL> serviceUrls2 = listener.getAddresses(service2 + ":dubbo",
consumerURL);
+ List<URL> serviceUrls2 = listener.getAddresses(protocolServiceKey2,
consumerURL);
Assertions.assertEquals(4, serviceUrls2.size());
assertTrue(serviceUrls2.get(0).getIp().contains("30.10."));
- List<URL> serviceUrls3 = listener.getAddresses(service3 + ":dubbo",
consumerURL);
+ List<URL> serviceUrls3 = listener.getAddresses(protocolServiceKey3,
consumerURL);
Assertions.assertEquals(2, serviceUrls3.size());
assertTrue(serviceUrls3.get(0).getIp().contains("30.10."));
}
@@ -308,13 +313,17 @@ public class ServiceInstancesChangedListenerTest {
Assertions.assertEquals(0, allInstances.get("app1").size());
Assertions.assertEquals(4, allInstances.get("app2").size());
- List<URL> serviceUrls = listener.getAddresses(service1 + ":dubbo",
consumerURL);
+ ProtocolServiceKey protocolServiceKey1 = new
ProtocolServiceKey(service1, null, null, "dubbo");
+ ProtocolServiceKey protocolServiceKey2 = new
ProtocolServiceKey(service2, null, null, "dubbo");
+ ProtocolServiceKey protocolServiceKey3 = new
ProtocolServiceKey(service3, null, null, "dubbo");
+
+ List<URL> serviceUrls = listener.getAddresses(protocolServiceKey1,
consumerURL);
Assertions.assertEquals(4, serviceUrls.size());
assertTrue(serviceUrls.get(0).getIp().contains("30.10."));
- List<URL> serviceUrls2 = listener.getAddresses(service2 + ":dubbo",
consumerURL);
+ List<URL> serviceUrls2 = listener.getAddresses(protocolServiceKey2,
consumerURL);
Assertions.assertEquals(4, serviceUrls2.size());
assertTrue(serviceUrls2.get(0).getIp().contains("30.10."));
- List<URL> serviceUrls3 = listener.getAddresses(service3 + ":dubbo",
consumerURL);
+ List<URL> serviceUrls3 = listener.getAddresses(protocolServiceKey3,
consumerURL);
Assertions.assertEquals(2, serviceUrls3.size());
assertTrue(serviceUrls3.get(0).getIp().contains("30.10."));
@@ -328,9 +337,9 @@ public class ServiceInstancesChangedListenerTest {
Assertions.assertEquals(0, allInstances_app2.get("app1").size());
Assertions.assertEquals(0, allInstances_app2.get("app2").size());
- assertTrue(isEmpty(listener.getAddresses(service1 + ":dubbo",
consumerURL)));
- assertTrue(isEmpty(listener.getAddresses(service2 + ":dubbo",
consumerURL)));
- assertTrue(isEmpty(listener.getAddresses(service3 + ":dubbo",
consumerURL)));
+ assertTrue(isEmpty(listener.getAddresses(protocolServiceKey1,
consumerURL)));
+ assertTrue(isEmpty(listener.getAddresses(protocolServiceKey2,
consumerURL)));
+ assertTrue(isEmpty(listener.getAddresses(protocolServiceKey3,
consumerURL)));
}
// 正常场景。检查instance listener -> service listener(Directory)地址推送流程
@@ -345,8 +354,8 @@ public class ServiceInstancesChangedListenerTest {
when(demoServiceListener.getConsumerUrl()).thenReturn(consumerURL);
NotifyListener demoService2Listener =
Mockito.mock(NotifyListener.class);
when(demoService2Listener.getConsumerUrl()).thenReturn(consumerURL2);
- listener.addListenerAndNotify(consumerURL.getProtocolServiceKey(),
demoServiceListener);
- listener.addListenerAndNotify(consumerURL2.getProtocolServiceKey(),
demoService2Listener);
+ listener.addListenerAndNotify(consumerURL, demoServiceListener);
+ listener.addListenerAndNotify(consumerURL2, demoService2Listener);
// notify app1 instance change
ServiceInstancesChangedEvent app1_event = new
ServiceInstancesChangedEvent("app1", app1Instances);
listener.onEvent(app1_event);
@@ -378,7 +387,7 @@ public class ServiceInstancesChangedListenerTest {
// test service listener still get notified when added after instance
notification.
NotifyListener demoService3Listener =
Mockito.mock(NotifyListener.class);
when(demoService3Listener.getConsumerUrl()).thenReturn(consumerURL3);
- listener.addListenerAndNotify(consumerURL3.getProtocolServiceKey(),
demoService3Listener);
+ listener.addListenerAndNotify(consumerURL3, demoService3Listener);
Mockito.verify(demoService3Listener,
Mockito.times(1)).notify(Mockito.anyList());
}
@@ -397,10 +406,10 @@ public class ServiceInstancesChangedListenerTest {
when(demoService2Listener1.getConsumerUrl()).thenReturn(consumerURL2);
NotifyListener demoService2Listener2 =
Mockito.mock(NotifyListener.class);
when(demoService2Listener2.getConsumerUrl()).thenReturn(consumerURL2);
- listener.addListenerAndNotify(consumerURL.getProtocolServiceKey(),
demoServiceListener1);
- listener.addListenerAndNotify(consumerURL.getProtocolServiceKey(),
demoServiceListener2);
- listener.addListenerAndNotify(consumerURL2.getProtocolServiceKey(),
demoService2Listener1);
- listener.addListenerAndNotify(consumerURL2.getProtocolServiceKey(),
demoService2Listener2);
+ listener.addListenerAndNotify(consumerURL, demoServiceListener1);
+ listener.addListenerAndNotify(consumerURL, demoServiceListener2);
+ listener.addListenerAndNotify(consumerURL2, demoService2Listener1);
+ listener.addListenerAndNotify(consumerURL2, demoService2Listener2);
// notify app1 instance change
ServiceInstancesChangedEvent app1_event = new
ServiceInstancesChangedEvent("app1", app1Instances);
listener.onEvent(app1_event);
@@ -432,7 +441,7 @@ public class ServiceInstancesChangedListenerTest {
// test service listener still get notified when added after instance
notification.
NotifyListener demoService3Listener =
Mockito.mock(NotifyListener.class);
when(demoService3Listener.getConsumerUrl()).thenReturn(consumerURL3);
- listener.addListenerAndNotify(consumerURL3.getProtocolServiceKey(),
demoService3Listener);
+ listener.addListenerAndNotify(consumerURL3, demoService3Listener);
Mockito.verify(demoService3Listener,
Mockito.times(1)).notify(Mockito.anyList());
}
@@ -448,15 +457,15 @@ public class ServiceInstancesChangedListenerTest {
// no protocol specified, consume all instances
NotifyListener demoServiceListener1 =
Mockito.mock(NotifyListener.class);
when(demoServiceListener1.getConsumerUrl()).thenReturn(noProtocolConsumerURL);
-
listener.addListenerAndNotify(noProtocolConsumerURL.getProtocolServiceKey(),
demoServiceListener1);
+ listener.addListenerAndNotify(noProtocolConsumerURL,
demoServiceListener1);
// multiple protocols specified
NotifyListener demoServiceListener2 =
Mockito.mock(NotifyListener.class);
when(demoServiceListener2.getConsumerUrl()).thenReturn(multipleProtocolsConsumerURL);
-
listener.addListenerAndNotify(multipleProtocolsConsumerURL.getProtocolServiceKey(),
demoServiceListener2);
+ listener.addListenerAndNotify(multipleProtocolsConsumerURL,
demoServiceListener2);
// one protocol specified
NotifyListener demoServiceListener3 =
Mockito.mock(NotifyListener.class);
when(demoServiceListener3.getConsumerUrl()).thenReturn(singleProtocolsConsumerURL);
-
listener.addListenerAndNotify(singleProtocolsConsumerURL.getProtocolServiceKey(),
demoServiceListener3);
+ listener.addListenerAndNotify(singleProtocolsConsumerURL,
demoServiceListener3);
// notify app1 instance change
ServiceInstancesChangedEvent app1_event = new
ServiceInstancesChangedEvent("app1", app1InstancesMultipleProtocols);
@@ -479,9 +488,119 @@ public class ServiceInstancesChangedListenerTest {
Assertions.assertEquals(1, single_protocol_notifiedUrls.size());
}
- // revision 异常场景。第一次启动,完全拿不到metadata,只能通知部分地址
+ /**
+ * Test subscribe multiple groups
+ */
@Test
@Order(8)
+ public void testSubscribeMultipleGroups() {
+ Set<String> serviceNames = new HashSet<>();
+ serviceNames.add("app1");
+ listener = new ServiceInstancesChangedListener(serviceNames,
serviceDiscovery);
+
+ // notify instance change
+ ServiceInstancesChangedEvent event = new
ServiceInstancesChangedEvent("app1", app1Instances);
+ listener.onEvent(event);
+
+ Map<String, List<ServiceInstance>> allInstances =
listener.getAllInstances();
+ Assertions.assertEquals(1, allInstances.size());
+ Assertions.assertEquals(3, allInstances.get("app1").size());
+
+ ProtocolServiceKey protocolServiceKey = new
ProtocolServiceKey(service1, null, null, "dubbo");
+ List<URL> serviceUrls = listener.getAddresses(protocolServiceKey,
consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, null, "",
"dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, null, ",group1",
"dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, null, "group1,",
"dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, null, "*",
"dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, null, "group1",
"dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(0, serviceUrls.size());
+
+ protocolServiceKey = new ProtocolServiceKey(service1, null,
"group1,group2", "dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(0, serviceUrls.size());
+
+ protocolServiceKey = new ProtocolServiceKey(service1, null,
"group1,,group2", "dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+ }
+
+ /**
+ * Test subscribe multiple versions
+ */
+ @Test
+ @Order(9)
+ public void testSubscribeMultipleVersions() {
+ Set<String> serviceNames = new HashSet<>();
+ serviceNames.add("app1");
+ listener = new ServiceInstancesChangedListener(serviceNames,
serviceDiscovery);
+
+ // notify instance change
+ ServiceInstancesChangedEvent event = new
ServiceInstancesChangedEvent("app1", app1Instances);
+ listener.onEvent(event);
+
+ Map<String, List<ServiceInstance>> allInstances =
listener.getAllInstances();
+ Assertions.assertEquals(1, allInstances.size());
+ Assertions.assertEquals(3, allInstances.get("app1").size());
+
+ ProtocolServiceKey protocolServiceKey = new
ProtocolServiceKey(service1, null, null, "dubbo");
+ List<URL> serviceUrls = listener.getAddresses(protocolServiceKey,
consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, "", null,
"dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, "*", null,
"dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, ",1.0.0", null,
"dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, "1.0.0,", null,
"dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, "1.0.0,,1.0.1",
null, "dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ assertTrue(serviceUrls.get(0) instanceof InstanceAddressURL);
+
+ protocolServiceKey = new ProtocolServiceKey(service1, "1.0.1,1.0.0",
null, "dubbo");
+ serviceUrls = listener.getAddresses(protocolServiceKey, consumerURL);
+ Assertions.assertEquals(0, serviceUrls.size());
+ }
+
+ // revision 异常场景。第一次启动,完全拿不到metadata,只能通知部分地址
+ @Test
+ @Order(10)
public void testRevisionFailureOnStartup() {
Set<String> serviceNames = new HashSet<>();
serviceNames.add("app1");
@@ -490,8 +609,12 @@ public class ServiceInstancesChangedListenerTest {
ServiceInstancesChangedEvent failed_revision_event = new
ServiceInstancesChangedEvent("app1", app1FailedInstances);
listener.onEvent(failed_revision_event);
- List<URL> serviceUrls = listener.getAddresses(service1 + ":dubbo",
consumerURL);
- List<URL> serviceUrls2 = listener.getAddresses(service2 + ":dubbo",
consumerURL);
+
+ ProtocolServiceKey protocolServiceKey1 = new
ProtocolServiceKey(service1, null, null, "dubbo");
+ ProtocolServiceKey protocolServiceKey2 = new
ProtocolServiceKey(service2, null, null, "dubbo");
+
+ List<URL> serviceUrls = listener.getAddresses(protocolServiceKey1,
consumerURL);
+ List<URL> serviceUrls2 = listener.getAddresses(protocolServiceKey2,
consumerURL);
assertTrue(isNotEmpty(serviceUrls));
assertTrue(isNotEmpty(serviceUrls2));
@@ -499,7 +622,7 @@ public class ServiceInstancesChangedListenerTest {
// revision 异常场景。运行中地址通知,拿不到revision就用老版本revision
@Test
- @Order(9)
+ @Order(11)
public void testRevisionFailureOnNotification() {
Set<String> serviceNames = new HashSet<>();
serviceNames.add("app1");
@@ -524,8 +647,11 @@ public class ServiceInstancesChangedListenerTest {
listener.onEvent(event2);
// event2 did not really take effect
- Assertions.assertEquals(3, listener.getAddresses(service1 + ":dubbo",
consumerURL).size());
- assertTrue(isEmpty(listener.getAddresses(service2 + ":dubbo",
consumerURL)));
+ ProtocolServiceKey protocolServiceKey1 = new
ProtocolServiceKey(service1, null, null, "dubbo");
+ ProtocolServiceKey protocolServiceKey2 = new
ProtocolServiceKey(service2, null, null, "dubbo");
+
+ Assertions.assertEquals(3, listener.getAddresses(protocolServiceKey1,
consumerURL).size());
+ assertTrue(isEmpty(listener.getAddresses(protocolServiceKey2,
consumerURL)));
//
init();
@@ -536,16 +662,16 @@ public class ServiceInstancesChangedListenerTest {
e.printStackTrace();
}
// check recovered after retry.
- List<URL> serviceUrls_after_retry = listener.getAddresses(service1 +
":dubbo", consumerURL);
+ List<URL> serviceUrls_after_retry =
listener.getAddresses(protocolServiceKey1, consumerURL);
Assertions.assertEquals(5, serviceUrls_after_retry.size());
- List<URL> serviceUrls2_after_retry = listener.getAddresses(service2 +
":dubbo", consumerURL);
+ List<URL> serviceUrls2_after_retry =
listener.getAddresses(protocolServiceKey2, consumerURL);
Assertions.assertEquals(2, serviceUrls2_after_retry.size());
}
// Abnormal case. Instance does not have revision
@Test
- @Order(10)
+ @Order(12)
public void testInstanceWithoutRevision() {
Set<String> serviceNames = new HashSet<>();
serviceNames.add("app1");
@@ -559,39 +685,6 @@ public class ServiceInstancesChangedListenerTest {
assertTrue(true);
}
- /**
- * Test calculation of subscription protocols
- */
- @Test
- public void testGetProtocolServiceKeyList() {
- NotifyListener listener = Mockito.mock(NotifyListener.class);
-
- Set<String> serviceNames = new HashSet<>();
- serviceNames.add("app1");
- ServiceDiscovery serviceDiscovery =
Mockito.mock(ServiceDiscovery.class);
- ServiceInstancesChangedListener instancesChangedListener = new
ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
-
- URL url1 = URL.valueOf("tri://localhost/Service?protocol=tri");
- when(listener.getConsumerUrl()).thenReturn(url1);
- Set<String> keyList11 =
instancesChangedListener.getProtocolServiceKeyList(url1.getProtocolServiceKey(),
listener);
- assertEquals(getExpectedSet(Arrays.asList("Service:tri")), keyList11);
-
- URL url2 =
URL.valueOf("consumer://localhost/Service?group=group&version=1.0");
- when(listener.getConsumerUrl()).thenReturn(url2);
- Set<String> keyList12 =
instancesChangedListener.getProtocolServiceKeyList(url2.getProtocolServiceKey(),
listener);
- assertEquals(getExpectedSet(Arrays.asList("group/Service:1.0:tri",
"group/Service:1.0:dubbo", "group/Service:1.0:rest")), keyList12);
-
- URL url3 =
URL.valueOf("dubbo://localhost/Service?protocol=dubbo&group=group&version=1.0");
- when(listener.getConsumerUrl()).thenReturn(url3);
- Set<String> keyList21 =
instancesChangedListener.getProtocolServiceKeyList(url3.getProtocolServiceKey(),
listener);
- assertEquals(getExpectedSet(Arrays.asList("group/Service:1.0:dubbo")),
keyList21);
-
- URL url4 =
URL.valueOf("dubbo,tri://localhost/Service?protocol=dubbo,tri&group=group&version=1.0");
- when(listener.getConsumerUrl()).thenReturn(url4);
- Set<String> keyList23 =
instancesChangedListener.getProtocolServiceKeyList(url4.getProtocolServiceKey(),
listener);
- assertEquals(getExpectedSet(Arrays.asList("group/Service:1.0:dubbo",
"group/Service:1.0:tri")), keyList23);
- }
-
Set<String> getExpectedSet(List<String> list) {
return new HashSet<>(list);
}