This is an automated email from the ASF dual-hosted git repository.

hufeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-js.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f7d456  enhancement: add dubbo-consumer scheduler and dubbo-cluseter 
refresh
8f7d456 is described below

commit 8f7d4561d09ce08fb9b5a3c12a9687e0fc7dd681
Author: hufeng <[email protected]>
AuthorDate: Mon Aug 9 16:27:38 2021 +0800

    enhancement: add dubbo-consumer scheduler and dubbo-cluseter refresh
---
 packages/dubbo-consumer/src/dubbo-cluster.ts | 46 +++++++++++++++++++---------
 packages/dubbo-consumer/src/scheduler.ts     | 21 +++++++++++++
 2 files changed, 52 insertions(+), 15 deletions(-)

diff --git a/packages/dubbo-consumer/src/dubbo-cluster.ts 
b/packages/dubbo-consumer/src/dubbo-cluster.ts
index b6ee3ba..123fcd3 100644
--- a/packages/dubbo-consumer/src/dubbo-cluster.ts
+++ b/packages/dubbo-consumer/src/dubbo-cluster.ts
@@ -34,7 +34,8 @@ const log = debug('dubbo:dubbo-cluster')
  */
 
 export default class DubboCluster
-  implements IDubboObservable<IDubboTransportSubscriber> {
+  implements IDubboObservable<IDubboTransportSubscriber>
+{
   private subscriber: IDubboTransportSubscriber
   private readonly dubboClusterTransportMap: Map<
     HostName,
@@ -53,21 +54,19 @@ export default class DubboCluster
 
   // ~~~~~~~~~~~~~~~~~~~~private methods~~~~~~~~~~~~~~~~~~~~~~~~~
 
-  private handleTransportClose = (
-    transport: DubboTcpTransport,
-    hostname: string
-  ) => (host: string) => {
-    log('receive dubbo-tcp-transport closed %s', transport.host)
-    if (!this.dubboClusterTransportMap.has(hostname)) {
-      return
-    }
+  private handleTransportClose =
+    (transport: DubboTcpTransport, hostname: string) => (host: string) => {
+      log('receive dubbo-tcp-transport closed %s', transport.host)
+      if (!this.dubboClusterTransportMap.has(hostname)) {
+        return
+      }
 
-    const transports = this.dubboClusterTransportMap.get(hostname)
-    log('delete dubbo-tcp-transport %s', transport.host)
-    transports.delete(transport)
-    log('current dubbo-tcp-transport map %O', this.dubboClusterTransportMap)
-    this.subscriber.onClose(host)
-  }
+      const transports = this.dubboClusterTransportMap.get(hostname)
+      log('delete dubbo-tcp-transport %s', transport.host)
+      transports.delete(transport)
+      log('current dubbo-tcp-transport map %O', this.dubboClusterTransportMap)
+      this.subscriber.onClose(host)
+    }
 
   private updateDubboClusterTransports(hostname: HostName, hosts: Set<Host>) {
     const transports = this.dubboClusterTransportMap.get(hostname)
@@ -153,6 +152,23 @@ export default class DubboCluster
     return this
   }
 
+  refresh(serviceHostMap: Map<HostName, Set<Host>>) {
+    for (let [hostname, hosts] of serviceHostMap.entries()) {
+      if (this.dubboClusterTransportMap.has(hostname)) {
+        const transports = this.dubboClusterTransportMap.get(hostname)
+        const transportHosts = [...transports].map(
+          (transport) => transport.host
+        )
+        const diff = [...hosts].filter((host) => 
!transportHosts.includes(host))
+        if (diff.length > 0) {
+          this.addDubboClusterTransports(hostname, new Set(diff))
+        }
+      } else {
+        this.addDubboClusterTransports(hostname, hosts)
+      }
+    }
+  }
+
   close() {
     for (let transports of this.dubboClusterTransportMap.values()) {
       transports.forEach((transport) => transport.close())
diff --git a/packages/dubbo-consumer/src/scheduler.ts 
b/packages/dubbo-consumer/src/scheduler.ts
index aff5362..d3e4640 100644
--- a/packages/dubbo-consumer/src/scheduler.ts
+++ b/packages/dubbo-consumer/src/scheduler.ts
@@ -49,6 +49,7 @@ export default class Scheduler {
   private status: STATUS
   private readonly queue: Queue
   private readonly registry: IRegistry<any>
+  private readonly refreshTimer: NodeJS.Timer
   private readonly dubboCluster: DubboCluster
   private readonly dubboServiceUrlMapper: Map<TDubboInterface, Array<DubboUrl>>
 
@@ -72,6 +73,9 @@ export default class Scheduler {
       },
       onClose: this.handleTransportClose
     })
+    this.refreshTimer = setInterval(() => {
+      this.refreshDubboCluster()
+    }, 10 * 1000)
 
     // init registry
     this.registry = registry
@@ -86,9 +90,26 @@ export default class Scheduler {
   }
 
   close() {
+    clearTimeout(this.refreshTimer)
     this.dubboCluster.close()
   }
 
+  private refreshDubboCluster() {
+    const serviceHostMap = new Map<HostName, Set<Host>>()
+    for (let urls of this.dubboServiceUrlMapper.values()) {
+      for (let { hostname, port } of urls) {
+        const host = `${hostname}:${port}`
+        if (serviceHostMap.has(hostname)) {
+          serviceHostMap.get(hostname).add(host)
+        } else {
+          serviceHostMap.set(hostname, new Set([host]))
+        }
+      }
+    }
+    log('refreshDubboCluster with map %O', serviceHostMap)
+    this.dubboCluster.refresh(serviceHostMap)
+  }
+
   /**
    * handle request in queue
    * @param ctx

Reply via email to