This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new d42b93dfda fixes #10079, address notification issue with service 
discovery multi subscription (#10080)
d42b93dfda is described below

commit d42b93dfda471b448cf6ccea364ba402e58615ce
Author: ken.lj <[email protected]>
AuthorDate: Tue May 24 14:42:09 2022 +0800

    fixes #10079, address notification issue with service discovery multi 
subscription (#10080)
---
 .../src/main/java/org/apache/dubbo/common/URL.java |   4 +-
 .../listener/ServiceInstancesChangedListener.java  | 109 +++++++++---------
 .../ServiceInstancesChangedListenerTest.java       | 123 ++++++++++++++++++++-
 3 files changed, 178 insertions(+), 58 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java 
b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
index c8078430d3..fb9f1b65a5 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
@@ -1402,8 +1402,8 @@ class URL implements Serializable {
         }
         this.protocolServiceKey = getServiceKey();
         /*
-        Special treatment if this is a consumer subscription url instance with 
no protocol specified - starts with 'consumer://'
-        If the specific protocol is specified on the consumer side, then this 
method will return as normal.
+        Special treatment for urls begins with 'consumer://', that is, a 
consumer subscription url instance with no protocol specified.
+        If protocol is specified on the consumer side, then this method will 
return as normal.
         */
         if (!CONSUMER.equals(getProtocol())) {
             this.protocolServiceKey += (GROUP_CHAR_SEPARATOR + getProtocol());
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 3604b18d43..3142f5f202 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -204,33 +204,30 @@ public class ServiceInstancesChangedListener {
             return;
         }
 
-        Set<String> protocolServiceKeys = 
getProtocolServiceKeyList(serviceKey, listener);
-        for (String protocolServiceKey : protocolServiceKeys) {
-            // Add to global listeners
-            if (!this.listeners.containsKey(serviceKey)) {
-                // synchronized method, no need to use DCL
-                this.listeners.put(serviceKey, new ConcurrentHashSet<>());
-            }
-            Set<NotifyListenerWithKey> notifyListeners = 
this.listeners.get(serviceKey);
-            notifyListeners.add(new NotifyListenerWithKey(protocolServiceKey, 
listener));
-        }
-
+        Set<NotifyListenerWithKey> notifyListeners = 
this.listeners.computeIfAbsent(serviceKey, _k -> new ConcurrentHashSet<>());
+        // {@code protocolServiceKeysToConsume} will be specific protocols 
configured in reference config or default protocols supported by framework.
+        Set<String> protocolServiceKeysToConsume = 
getProtocolServiceKeyList(serviceKey, listener);
+        // Add current listener to serviceKey set, there will have more than 
one listener when multiple references of one same service is configured.
+        NotifyListenerWithKey listenerWithKey = new 
NotifyListenerWithKey(serviceKey, protocolServiceKeysToConsume, listener);
+        notifyListeners.add(listenerWithKey);
+
+        // Aggregate address and notify on subscription.
         List<URL> urls;
-        if (protocolServiceKeys.size() > 1) {
+        if (protocolServiceKeysToConsume.size() > 1) {
             urls = new ArrayList<>();
-            for (NotifyListenerWithKey notifyListenerWithKey : 
this.listeners.get(serviceKey)) {
-                String protocolKey = 
notifyListenerWithKey.getProtocolServiceKey();
-                List<URL> urlsOfProtocol = getAddresses(protocolKey, 
listener.getConsumerUrl());
+            for (String protocolServiceKey : protocolServiceKeysToConsume) {
+                List<URL> urlsOfProtocol = getAddresses(protocolServiceKey, 
listener.getConsumerUrl());
                 if (CollectionUtils.isNotEmpty(urlsOfProtocol)) {
+                    logger.info(String.format("Found %s urls of protocol 
service key %s ", urlsOfProtocol.size(), protocolServiceKey));
                     urls.addAll(urlsOfProtocol);
                 }
             }
         } else {
-            String protocolKey = 
this.listeners.get(serviceKey).iterator().next().getProtocolServiceKey();
-            urls = getAddresses(protocolKey, listener.getConsumerUrl());
+            urls = 
getAddresses(protocolServiceKeysToConsume.iterator().next(), 
listener.getConsumerUrl());
         }
 
         if (CollectionUtils.isNotEmpty(urls)) {
+            logger.info(String.format("Notify serviceKey: %s, listener: %s 
with %s urls on subscription", serviceKey, listener, urls.size()));
             listener.notify(urls);
         }
     }
@@ -240,18 +237,16 @@ public class ServiceInstancesChangedListener {
             return;
         }
 
-        for (String protocolServiceKey : getProtocolServiceKeyList(serviceKey, 
notifyListener)) {
-            // synchronized method, no need to use DCL
-            Set<NotifyListenerWithKey> notifyListeners = 
this.listeners.get(serviceKey);
-            if (notifyListeners != null) {
-                NotifyListenerWithKey listenerWithKey = new 
NotifyListenerWithKey(protocolServiceKey, notifyListener);
-                // Remove from global listeners
-                notifyListeners.remove(listenerWithKey);
+        // synchronized method, no need to use DCL
+        Set<NotifyListenerWithKey> notifyListeners = 
this.listeners.get(serviceKey);
+        if (notifyListeners != null) {
+            NotifyListenerWithKey listenerWithKey = new 
NotifyListenerWithKey(serviceKey, notifyListener);
+            // Remove from global listeners
+            notifyListeners.remove(listenerWithKey);
 
-                // ServiceKey has no listener, remove set
-                if (notifyListeners.size() == 0) {
-                    this.listeners.remove(serviceKey);
-                }
+            // ServiceKey has no listener, remove set
+            if (notifyListeners.size() == 0) {
+                this.listeners.remove(serviceKey);
             }
         }
     }
@@ -385,32 +380,32 @@ public class ServiceInstancesChangedListener {
      * race condition is protected by onEvent/doOnEvent
      */
     protected void notifyAddressChanged() {
+        // 1 different services
         listeners.forEach((serviceKey, listenerSet) -> {
-            if (listenerSet != null) {
-                if (listenerSet.size() == 1) {
-                    NotifyListenerWithKey listenerWithKey = 
listenerSet.iterator().next();
-                    String protocolServiceKey = 
listenerWithKey.getProtocolServiceKey();
-                    NotifyListener notifyListener = 
listenerWithKey.getNotifyListener();
+            // 2 multiple subscription listener of the same service
+            for (NotifyListenerWithKey listenerWithKey : listenerSet) {
+                NotifyListener notifyListener = 
listenerWithKey.getNotifyListener();
+                if (listenerWithKey.getProtocolServiceKeys().size() == 1) {// 
2.1 if one specific protocol is specified
+                    String protocolServiceKey = 
listenerWithKey.getProtocolServiceKeys().iterator().next();
                     //FIXME, group wildcard match
                     List<URL> urls = 
toUrlsWithEmpty(getAddresses(protocolServiceKey, 
notifyListener.getConsumerUrl()));
-                    logger.info("Notify service " + serviceKey + " with urls " 
+ urls.size());
+                    logger.info("Notify service " + protocolServiceKey + " 
with urls " + urls.size());
                     notifyListener.notify(urls);
-                } else {
+                } else {// 2.2 multiple protocols or no protocol(using default 
protocols) set
                     List<URL> urls = new ArrayList<>();
-                    NotifyListener notifyListener = null;
-                    for (NotifyListenerWithKey listenerWithKey : listenerSet) {
-                        String protocolServiceKey = 
listenerWithKey.getProtocolServiceKey();
-                        notifyListener = listenerWithKey.getNotifyListener();
+                    int effectiveProtocolNum = 0;
+                    for (String protocolServiceKey : 
listenerWithKey.getProtocolServiceKeys()) {
                         List<URL> tmpUrls = getAddresses(protocolServiceKey, 
notifyListener.getConsumerUrl());
                         if (CollectionUtils.isNotEmpty(tmpUrls)) {
+                            logger.info("Found  " + urls.size() + " urls of 
protocol service key " + protocolServiceKey);
+                            effectiveProtocolNum++;
                             urls.addAll(tmpUrls);
                         }
                     }
-                    if (notifyListener != null) {
-                        logger.info("Notify service " + serviceKey + " with 
urls " + urls.size());
-                        urls = toUrlsWithEmpty(urls);
-                        notifyListener.notify(urls);
-                    }
+
+                    logger.info("Notify service " + serviceKey + " with " + 
urls.size() + " urls from " + effectiveProtocolNum + " different protocols");
+                    urls = toUrlsWithEmpty(urls);
+                    notifyListener.notify(urls);
                 }
             }
         });
@@ -477,7 +472,7 @@ public class ServiceInstancesChangedListener {
      * Calculate the protocol list that the consumer cares about.
      *
      * @param serviceKey possible input serviceKey includes
-     *                   1. {group}/{interface}:{version}:consumer
+     *                   1. {group}/{interface}:{version}, if protocol is not 
specified
      *                   2. {group}/{interface}:{version}:{user specified 
protocols}
      * @param listener   listener also contains the user specified protocols
      * @return protocol list with the format 
{group}/{interface}:{version}:{protocol}
@@ -530,16 +525,26 @@ public class ServiceInstancesChangedListener {
     }
 
     public static class NotifyListenerWithKey {
-        private final String protocolServiceKey;
+        private final String serviceKey;
+        private final Set<String> protocolServiceKeys;
         private final NotifyListener notifyListener;
 
-        public NotifyListenerWithKey(String protocolServiceKey, NotifyListener 
notifyListener) {
-            this.protocolServiceKey = protocolServiceKey;
+        public NotifyListenerWithKey(String protocolServiceKey, Set<String> 
protocolServiceKeys, NotifyListener notifyListener) {
+            this.serviceKey = protocolServiceKey;
+            this.protocolServiceKeys = (protocolServiceKeys == null ? new 
ConcurrentHashSet<>() : protocolServiceKeys);
             this.notifyListener = notifyListener;
         }
 
-        public String getProtocolServiceKey() {
-            return protocolServiceKey;
+        public NotifyListenerWithKey(String protocolServiceKey, NotifyListener 
notifyListener) {
+            this(protocolServiceKey, null, notifyListener);
+        }
+
+        public String getServiceKey() {
+            return serviceKey;
+        }
+
+        public Set<String> getProtocolServiceKeys() {
+            return protocolServiceKeys;
         }
 
         public NotifyListener getNotifyListener() {
@@ -555,12 +560,12 @@ public class ServiceInstancesChangedListener {
                 return false;
             }
             NotifyListenerWithKey that = (NotifyListenerWithKey) o;
-            return Objects.equals(protocolServiceKey, that.protocolServiceKey) 
&& Objects.equals(notifyListener, that.notifyListener);
+            return Objects.equals(serviceKey, that.serviceKey) && 
Objects.equals(notifyListener, that.notifyListener);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(protocolServiceKey, notifyListener);
+            return Objects.hash(serviceKey, notifyListener);
         }
     }
 }
diff --git 
a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
 
b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
index 3c0734237c..c4a06f2131 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
@@ -82,6 +82,7 @@ public class ServiceInstancesChangedListenerTest {
     static List<ServiceInstance> app1FailedInstances;
     static List<ServiceInstance> app1FailedInstances2;
     static List<ServiceInstance> app1InstancesWithNoRevision;
+    static List<ServiceInstance> app1InstancesMultipleProtocols;
 
     static String metadata_111 = 
"{\"app\":\"app1\",\"revision\":\"111\",\"services\":{"
         + 
"\"org.apache.dubbo.demo.DemoService:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService\",\"protocol\":\"dubbo\",\"path\":\"org.apache.dubbo.demo.DemoService\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"delay\
 [...]
@@ -99,6 +100,10 @@ public class ServiceInstancesChangedListenerTest {
     static String metadata_444 = 
"{\"app\":\"app1\",\"revision\":\"444\",\"services\":{"
         + 
"\"org.apache.dubbo.demo.DemoService:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService\",\"protocol\":\"dubbo\",\"path\":\"org.apache.dubbo.demo.DemoService\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"delay\
 [...]
         + "}}";
+    // only triple protocol enabled
+    static String metadata_555_triple = 
"{\"app\":\"app1\",\"revision\":\"555\",\"services\":{"
+        + 
"\"org.apache.dubbo.demo.DemoService:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService\",\"protocol\":\"tri\",\"path\":\"org.apache.dubbo.demo.DemoService\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"delay\":
 [...]
+        + "}}";
 
     static String service1 = "org.apache.dubbo.demo.DemoService";
     static String service2 = "org.apache.dubbo.demo.DemoService2";
@@ -107,12 +112,16 @@ public class ServiceInstancesChangedListenerTest {
     static URL consumerURL = 
URL.valueOf("dubbo://127.0.0.1/org.apache.dubbo.demo.DemoService?interface=org.apache.dubbo.demo.DemoService&protocol=dubbo&registry_cluster=default");
     static URL consumerURL2 = 
URL.valueOf("dubbo://127.0.0.1/org.apache.dubbo.demo.DemoService2?interface=org.apache.dubbo.demo.DemoService2&protocol=dubbo&registry_cluster=default");
     static URL consumerURL3 = 
URL.valueOf("dubbo://127.0.0.1/org.apache.dubbo.demo.DemoService3?interface=org.apache.dubbo.demo.DemoService3&protocol=dubbo&registry_cluster=default");
+    static URL multipleProtocolsConsumerURL = 
URL.valueOf("dubbo,tri://127.0.0.1/org.apache.dubbo.demo.DemoService?interface=org.apache.dubbo.demo.DemoService&protocol=dubbo,tri&registry_cluster=default");
+    static URL noProtocolConsumerURL = 
URL.valueOf("consumer://127.0.0.1/org.apache.dubbo.demo.DemoService?interface=org.apache.dubbo.demo.DemoService&registry_cluster=default");
+    static URL singleProtocolsConsumerURL = 
URL.valueOf("tri://127.0.0.1/org.apache.dubbo.demo.DemoService?interface=org.apache.dubbo.demo.DemoService&protocol=tri&registry_cluster=default");
     static URL registryURL = 
URL.valueOf("dubbo://127.0.0.1:2181/org.apache.dubbo.demo.RegistryService");
 
     static MetadataInfo metadataInfo_111;
     static MetadataInfo metadataInfo_222;
     static MetadataInfo metadataInfo_333;
     static MetadataInfo metadataInfo_444;
+    static MetadataInfo metadataInfo_555_tri;
 
     static MetadataService metadataService;
 
@@ -149,16 +158,22 @@ public class ServiceInstancesChangedListenerTest {
         List<Object> urlsWithoutRevision = new ArrayList<>();
         urlsWithoutRevision.add("30.10.0.1:20880");
 
+        List<Object> urlsMultipleProtocols = new ArrayList<>();
+        urlsMultipleProtocols.add("30.10.0.1:20880?revision=555");//triple
+        urlsMultipleProtocols.addAll(urlsSameRevision);// dubbo
+
         app1Instances = buildInstances(urlsSameRevision);
         app2Instances = buildInstances(urlsDifferentRevision);
         app1FailedInstances = buildInstances(urlsFailedRevision);
         app1FailedInstances2 = buildInstances(urlsFailedRevision2);
         app1InstancesWithNoRevision = buildInstances(urlsWithoutRevision);
+        app1InstancesMultipleProtocols = buildInstances(urlsMultipleProtocols);
 
         metadataInfo_111 = gson.fromJson(metadata_111, MetadataInfo.class);
         metadataInfo_222 = gson.fromJson(metadata_222, MetadataInfo.class);
         metadataInfo_333 = gson.fromJson(metadata_333, MetadataInfo.class);
         metadataInfo_444 = gson.fromJson(metadata_444, MetadataInfo.class);
+        metadataInfo_555_tri = gson.fromJson(metadata_555_triple, 
MetadataInfo.class);
 
         serviceDiscovery = Mockito.mock(ServiceDiscovery.class);
         when(serviceDiscovery.getUrl()).thenReturn(registryURL);
@@ -167,6 +182,7 @@ public class ServiceInstancesChangedListenerTest {
         when(serviceDiscovery.getRemoteMetadata(eq("222"), 
anyList())).thenReturn(metadataInfo_222);
         when(serviceDiscovery.getRemoteMetadata(eq("333"), 
anyList())).thenReturn(metadataInfo_333);
         when(serviceDiscovery.getRemoteMetadata(eq("444"), 
anyList())).thenReturn(MetadataInfo.EMPTY);
+        when(serviceDiscovery.getRemoteMetadata(eq("555"), 
anyList())).thenReturn(metadataInfo_555_tri);
     }
 
 
@@ -367,9 +383,106 @@ public class ServiceInstancesChangedListenerTest {
         Mockito.verify(demoService3Listener, 
Mockito.times(1)).notify(Mockito.anyList());
     }
 
-    // revision 异常场景。第一次启动,完全拿不到metadata,只能通知部分地址
     @Test
     @Order(6)
+    public void testMultiServiceListenerNotification() {
+        Set<String> serviceNames = new HashSet<>();
+        serviceNames.add("app1");
+        serviceNames.add("app2");
+        listener = new ServiceInstancesChangedListener(serviceNames, 
serviceDiscovery);
+        NotifyListener demoServiceListener1 = 
Mockito.mock(NotifyListener.class);
+        when(demoServiceListener1.getConsumerUrl()).thenReturn(consumerURL);
+        NotifyListener demoServiceListener2 = 
Mockito.mock(NotifyListener.class);
+        when(demoServiceListener2.getConsumerUrl()).thenReturn(consumerURL);
+        NotifyListener demoService2Listener1 = 
Mockito.mock(NotifyListener.class);
+        when(demoService2Listener1.getConsumerUrl()).thenReturn(consumerURL2);
+        NotifyListener demoService2Listener2 = 
Mockito.mock(NotifyListener.class);
+        when(demoService2Listener2.getConsumerUrl()).thenReturn(consumerURL2);
+        listener.addListenerAndNotify(consumerURL.getProtocolServiceKey(), 
demoServiceListener1);
+        listener.addListenerAndNotify(consumerURL.getProtocolServiceKey(), 
demoServiceListener2);
+        listener.addListenerAndNotify(consumerURL2.getProtocolServiceKey(), 
demoService2Listener1);
+        listener.addListenerAndNotify(consumerURL2.getProtocolServiceKey(), 
demoService2Listener2);
+        // notify app1 instance change
+        ServiceInstancesChangedEvent app1_event = new 
ServiceInstancesChangedEvent("app1", app1Instances);
+        listener.onEvent(app1_event);
+
+        // check
+        ArgumentCaptor<List<URL>> captor = ArgumentCaptor.forClass(List.class);
+        Mockito.verify(demoServiceListener1, 
Mockito.times(1)).notify(captor.capture());
+        List<URL> notifiedUrls = captor.getValue();
+        Assertions.assertEquals(3, notifiedUrls.size());
+        ArgumentCaptor<List<URL>> captor2 = 
ArgumentCaptor.forClass(List.class);
+        Mockito.verify(demoService2Listener1, 
Mockito.times(1)).notify(captor2.capture());
+        List<URL> notifiedUrls2 = captor2.getValue();
+        Assertions.assertEquals(0, notifiedUrls2.size());
+
+        // notify app2 instance change
+        ServiceInstancesChangedEvent app2_event = new 
ServiceInstancesChangedEvent("app2", app2Instances);
+        listener.onEvent(app2_event);
+
+        // check
+        ArgumentCaptor<List<URL>> app2_captor = 
ArgumentCaptor.forClass(List.class);
+        Mockito.verify(demoServiceListener1, 
Mockito.times(2)).notify(app2_captor.capture());
+        List<URL> app2_notifiedUrls = app2_captor.getValue();
+        Assertions.assertEquals(7, app2_notifiedUrls.size());
+        ArgumentCaptor<List<URL>> app2_captor2 = 
ArgumentCaptor.forClass(List.class);
+        Mockito.verify(demoService2Listener1, 
Mockito.times(2)).notify(app2_captor2.capture());
+        List<URL> app2_notifiedUrls2 = app2_captor2.getValue();
+        Assertions.assertEquals(4, app2_notifiedUrls2.size());
+
+        // test service listener still get notified when added after instance 
notification.
+        NotifyListener demoService3Listener = 
Mockito.mock(NotifyListener.class);
+        when(demoService3Listener.getConsumerUrl()).thenReturn(consumerURL3);
+        listener.addListenerAndNotify(consumerURL3.getProtocolServiceKey(), 
demoService3Listener);
+        Mockito.verify(demoService3Listener, 
Mockito.times(1)).notify(Mockito.anyList());
+    }
+
+    /**
+     * Test subscribe multiple protocols
+     */
+    @Test
+    @Order(7)
+    public void testSubscribeMultipleProtocols() {
+        Set<String> serviceNames = new HashSet<>();
+        serviceNames.add("app1");
+        listener = new ServiceInstancesChangedListener(serviceNames, 
serviceDiscovery);
+        // no protocol specified, consume all instances
+        NotifyListener demoServiceListener1 = 
Mockito.mock(NotifyListener.class);
+        
when(demoServiceListener1.getConsumerUrl()).thenReturn(noProtocolConsumerURL);
+        
listener.addListenerAndNotify(noProtocolConsumerURL.getProtocolServiceKey(), 
demoServiceListener1);
+        // multiple protocols specified
+        NotifyListener demoServiceListener2 = 
Mockito.mock(NotifyListener.class);
+        
when(demoServiceListener2.getConsumerUrl()).thenReturn(multipleProtocolsConsumerURL);
+        
listener.addListenerAndNotify(multipleProtocolsConsumerURL.getProtocolServiceKey(),
 demoServiceListener2);
+        // one protocol specified
+        NotifyListener demoServiceListener3 = 
Mockito.mock(NotifyListener.class);
+        
when(demoServiceListener3.getConsumerUrl()).thenReturn(singleProtocolsConsumerURL);
+        
listener.addListenerAndNotify(singleProtocolsConsumerURL.getProtocolServiceKey(),
 demoServiceListener3);
+
+        // notify app1 instance change
+        ServiceInstancesChangedEvent app1_event = new 
ServiceInstancesChangedEvent("app1", app1InstancesMultipleProtocols);
+        listener.onEvent(app1_event);
+
+        // check instances expose framework supported default 
protocols(currently dubbo, triple and rest) are notified
+        ArgumentCaptor<List<URL>> default_protocol_captor = 
ArgumentCaptor.forClass(List.class);
+        Mockito.verify(demoServiceListener1, 
Mockito.times(1)).notify(default_protocol_captor.capture());
+        List<URL> default_protocol_notifiedUrls = 
default_protocol_captor.getValue();
+        Assertions.assertEquals(4, default_protocol_notifiedUrls.size());
+        // check instances expose protocols in consuming list(dubbo and 
triple) are notified
+        ArgumentCaptor<List<URL>> multi_protocols_captor = 
ArgumentCaptor.forClass(List.class);
+        Mockito.verify(demoServiceListener2, 
Mockito.times(1)).notify(multi_protocols_captor.capture());
+        List<URL> multi_protocol_notifiedUrls = 
multi_protocols_captor.getValue();
+        Assertions.assertEquals(4, multi_protocol_notifiedUrls.size());
+        // check instances expose protocols in consuming list(only triple) are 
notified
+        ArgumentCaptor<List<URL>> single_protocols_captor = 
ArgumentCaptor.forClass(List.class);
+        Mockito.verify(demoServiceListener3, 
Mockito.times(1)).notify(single_protocols_captor.capture());
+        List<URL> single_protocol_notifiedUrls = 
single_protocols_captor.getValue();
+        Assertions.assertEquals(1, single_protocol_notifiedUrls.size());
+    }
+
+    // revision 异常场景。第一次启动,完全拿不到metadata,只能通知部分地址
+    @Test
+    @Order(8)
     public void testRevisionFailureOnStartup() {
         Set<String> serviceNames = new HashSet<>();
         serviceNames.add("app1");
@@ -387,7 +500,7 @@ public class ServiceInstancesChangedListenerTest {
 
     // revision 异常场景。运行中地址通知,拿不到revision就用老版本revision
     @Test
-    @Order(7)
+    @Order(9)
     public void testRevisionFailureOnNotification() {
         Set<String> serviceNames = new HashSet<>();
         serviceNames.add("app1");
@@ -431,10 +544,9 @@ public class ServiceInstancesChangedListenerTest {
 
     }
 
-
     // Abnormal case. Instance does not have revision
     @Test
-    @Order(9)
+    @Order(10)
     public void testInstanceWithoutRevision() {
         Set<String> serviceNames = new HashSet<>();
         serviceNames.add("app1");
@@ -448,6 +560,9 @@ public class ServiceInstancesChangedListenerTest {
         assertTrue(true);
     }
 
+    /**
+     * Test calculation of subscription protocols
+     */
     @Test
     public void testGetProtocolServiceKeyList() {
         NotifyListener listener = Mockito.mock(NotifyListener.class);

Reply via email to