This is an automated email from the ASF dual-hosted git repository.
hufeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-js.git
The following commit(s) were added to refs/heads/master by this push:
new 8f7d456 enhancement: add dubbo-consumer scheduler and dubbo-cluseter
refresh
8f7d456 is described below
commit 8f7d4561d09ce08fb9b5a3c12a9687e0fc7dd681
Author: hufeng <[email protected]>
AuthorDate: Mon Aug 9 16:27:38 2021 +0800
enhancement: add dubbo-consumer scheduler and dubbo-cluseter refresh
---
packages/dubbo-consumer/src/dubbo-cluster.ts | 46 +++++++++++++++++++---------
packages/dubbo-consumer/src/scheduler.ts | 21 +++++++++++++
2 files changed, 52 insertions(+), 15 deletions(-)
diff --git a/packages/dubbo-consumer/src/dubbo-cluster.ts
b/packages/dubbo-consumer/src/dubbo-cluster.ts
index b6ee3ba..123fcd3 100644
--- a/packages/dubbo-consumer/src/dubbo-cluster.ts
+++ b/packages/dubbo-consumer/src/dubbo-cluster.ts
@@ -34,7 +34,8 @@ const log = debug('dubbo:dubbo-cluster')
*/
export default class DubboCluster
- implements IDubboObservable<IDubboTransportSubscriber> {
+ implements IDubboObservable<IDubboTransportSubscriber>
+{
private subscriber: IDubboTransportSubscriber
private readonly dubboClusterTransportMap: Map<
HostName,
@@ -53,21 +54,19 @@ export default class DubboCluster
// ~~~~~~~~~~~~~~~~~~~~private methods~~~~~~~~~~~~~~~~~~~~~~~~~
- private handleTransportClose = (
- transport: DubboTcpTransport,
- hostname: string
- ) => (host: string) => {
- log('receive dubbo-tcp-transport closed %s', transport.host)
- if (!this.dubboClusterTransportMap.has(hostname)) {
- return
- }
+ private handleTransportClose =
+ (transport: DubboTcpTransport, hostname: string) => (host: string) => {
+ log('receive dubbo-tcp-transport closed %s', transport.host)
+ if (!this.dubboClusterTransportMap.has(hostname)) {
+ return
+ }
- const transports = this.dubboClusterTransportMap.get(hostname)
- log('delete dubbo-tcp-transport %s', transport.host)
- transports.delete(transport)
- log('current dubbo-tcp-transport map %O', this.dubboClusterTransportMap)
- this.subscriber.onClose(host)
- }
+ const transports = this.dubboClusterTransportMap.get(hostname)
+ log('delete dubbo-tcp-transport %s', transport.host)
+ transports.delete(transport)
+ log('current dubbo-tcp-transport map %O', this.dubboClusterTransportMap)
+ this.subscriber.onClose(host)
+ }
private updateDubboClusterTransports(hostname: HostName, hosts: Set<Host>) {
const transports = this.dubboClusterTransportMap.get(hostname)
@@ -153,6 +152,23 @@ export default class DubboCluster
return this
}
+ refresh(serviceHostMap: Map<HostName, Set<Host>>) {
+ for (let [hostname, hosts] of serviceHostMap.entries()) {
+ if (this.dubboClusterTransportMap.has(hostname)) {
+ const transports = this.dubboClusterTransportMap.get(hostname)
+ const transportHosts = [...transports].map(
+ (transport) => transport.host
+ )
+ const diff = [...hosts].filter((host) =>
!transportHosts.includes(host))
+ if (diff.length > 0) {
+ this.addDubboClusterTransports(hostname, new Set(diff))
+ }
+ } else {
+ this.addDubboClusterTransports(hostname, hosts)
+ }
+ }
+ }
+
close() {
for (let transports of this.dubboClusterTransportMap.values()) {
transports.forEach((transport) => transport.close())
diff --git a/packages/dubbo-consumer/src/scheduler.ts
b/packages/dubbo-consumer/src/scheduler.ts
index aff5362..d3e4640 100644
--- a/packages/dubbo-consumer/src/scheduler.ts
+++ b/packages/dubbo-consumer/src/scheduler.ts
@@ -49,6 +49,7 @@ export default class Scheduler {
private status: STATUS
private readonly queue: Queue
private readonly registry: IRegistry<any>
+ private readonly refreshTimer: NodeJS.Timer
private readonly dubboCluster: DubboCluster
private readonly dubboServiceUrlMapper: Map<TDubboInterface, Array<DubboUrl>>
@@ -72,6 +73,9 @@ export default class Scheduler {
},
onClose: this.handleTransportClose
})
+ this.refreshTimer = setInterval(() => {
+ this.refreshDubboCluster()
+ }, 10 * 1000)
// init registry
this.registry = registry
@@ -86,9 +90,26 @@ export default class Scheduler {
}
close() {
+ clearTimeout(this.refreshTimer)
this.dubboCluster.close()
}
+ private refreshDubboCluster() {
+ const serviceHostMap = new Map<HostName, Set<Host>>()
+ for (let urls of this.dubboServiceUrlMapper.values()) {
+ for (let { hostname, port } of urls) {
+ const host = `${hostname}:${port}`
+ if (serviceHostMap.has(hostname)) {
+ serviceHostMap.get(hostname).add(host)
+ } else {
+ serviceHostMap.set(hostname, new Set([host]))
+ }
+ }
+ }
+ log('refreshDubboCluster with map %O', serviceHostMap)
+ this.dubboCluster.refresh(serviceHostMap)
+ }
+
/**
* handle request in queue
* @param ctx