This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new a8fba70 [ISSUE #1601] polarismesh registry center (#1620)
a8fba70 is described below
commit a8fba7015a448387362a41de644aadce47318fae
Author: liaochuntao <[email protected]>
AuthorDate: Wed Dec 1 22:13:04 2021 +0800
[ISSUE #1601] polarismesh registry center (#1620)
* feat: try to support dubbo
* Merge 3.0 to master (#1606)
* build(deps): bump github.com/hashicorp/vault/sdk from 0.2.1 to 0.3.0
(#1552)
Bumps [github.com/hashicorp/vault/sdk](https://github.com/hashicorp/vault)
from 0.2.1 to 0.3.0.
- [Release notes](https://github.com/hashicorp/vault/releases)
- [Changelog](https://github.com/hashicorp/vault/blob/main/CHANGELOG.md)
- [Commits](https://github.com/hashicorp/vault/compare/sdk/v0.2.1...v0.3.0)
---
updated-dependencies:
- dependency-name: github.com/hashicorp/vault/sdk
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* delete useless codes (#1551)
* Update CHANGELOG.md
* [#1559]: adjust the startup process of the configuration center (#1560)
* [#1559]:
1. fix the startup process of the configuration center(remove error log)
2. refactor zk registry listener
* delete redundant dynamic configuration center cache
Co-authored-by: dongjianhui03 <[email protected]>
* [#1276]modify the name of the receiver to a single character (#1561)
Co-authored-by: dongjianhui03 <[email protected]>
* style(common): rename constants in camel-case style (#1563)
* style(common): rename constants in camel-case style
* style(common): rename Ttl to TTL
* fix: #1558: Set root config to global ptr in Init() function. (#1564)
* fix: #1558
* fix: change the api to start the app by config-api
* fix: in the Consumer service, the Reference config property ‘check’ use
the Consumer config when omitted
* Triple upgrade to grpc (#1566)
* fix: init triple grpc
* fix: delete replace
* Fix: upgrade triple grpc
* Fix: remove unused file
* Fix decrease grpc go version to 1.14
* Fix: add server side serialization
* register not use metadata configuration.
* build(deps): bump github.com/go-resty/resty/v2 from 2.3.0 to 2.7.0 (#1579)
Bumps [github.com/go-resty/resty/v2](https://github.com/go-resty/resty)
from 2.3.0 to 2.7.0.
- [Release notes](https://github.com/go-resty/resty/releases)
- [Commits](https://github.com/go-resty/resty/compare/v2.3.0...v2.7.0)
---
updated-dependencies:
- dependency-name: github.com/go-resty/resty/v2
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* build(deps): bump google.golang.org/grpc from 1.41.0 to 1.42.0 (#1578)
Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.41.0
to 1.42.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.41.0...v1.42.0)
---
updated-dependencies:
- dependency-name: google.golang.org/grpc
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* build(deps): bump github.com/knadh/koanf from 1.3.0 to 1.3.2 (#1577)
Bumps [github.com/knadh/koanf](https://github.com/knadh/koanf) from 1.3.0
to 1.3.2.
- [Release notes](https://github.com/knadh/koanf/releases)
- [Commits](https://github.com/knadh/koanf/compare/v1.3.0...v1.3.2)
---
updated-dependencies:
- dependency-name: github.com/knadh/koanf
dependency-type: direct:production
update-type: version-update:semver-patch
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* fix: registry param
* fix: fix ns and group
* fix: integrate test
* Fix: integrate test
* fix: url fmt
* fix: nacos servicediscovery group
* Fix: enhance zk
* start application (#1571)
Co-authored-by: wangxiaowei14227 <[email protected]>
Co-authored-by: lizhixin.lzx <[email protected]>
Co-authored-by: LaurenceLiZhixin <[email protected]>
* graceful shutdwon filter (#1585)
Co-authored-by: wangxiaowei14227 <[email protected]>
* Fix: some bugs and features for 3.0 (#1586)
* fix: some bugs
* Fix: remove file\k8s registry, k8s remote
* refactor listenDirEvent
1. remove CLEAR label;
2. remove register/unregister logic for watch children;
3. reorder the zk dynamic configuration code
4. adapt the gost
* hessian encode attachments return error (#1588)
* upgrade gost dependency version
* upgrade hessian to v1.9.5 (#1595)
* build(deps): bump k8s.io/apimachinery from 0.16.9 to 0.22.4 (#1600)
Bumps [k8s.io/apimachinery](https://github.com/kubernetes/apimachinery)
from 0.16.9 to 0.22.4.
- [Release notes](https://github.com/kubernetes/apimachinery/releases)
-
[Commits](https://github.com/kubernetes/apimachinery/compare/v0.16.9...v0.22.4)
---
updated-dependencies:
- dependency-name: k8s.io/apimachinery
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* ftr: triple tracing (#1596)
* ftr: triple tracing
* fix: if-else to switch
* [fix #1590] filter single instance (#1591)
* filter single instance
* filter single instance
Co-authored-by: wangxiaowei14227 <[email protected]>
* Ftr/triple reflect support (#1603)
* ftr: triple-reflection
* fix: comment and var name
* Fix: comment typo
* Fix: official website
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Xin.Zh <[email protected]>
Co-authored-by: Mulavar <[email protected]>
Co-authored-by: dongjianhui03 <[email protected]>
Co-authored-by: Xuewei Niu <[email protected]>
Co-authored-by: PhilYue <[email protected]>
Co-authored-by: lizhixin.lzx <[email protected]>
Co-authored-by: sanxun0325 <[email protected]>
Co-authored-by: Xuewei Niu <[email protected]>
Co-authored-by: zhaoyunxing <[email protected]>
Co-authored-by: wangxw <[email protected]>
Co-authored-by: wangxiaowei14227 <[email protected]>
Co-authored-by: 望哥 <[email protected]>
* feat: support polaris
* feat: support polarismesh registry
* fix: fix cr issue
* chore: run go mod tidy to fix ci-error
* refactor: change polaris::ConsumerAPI.GetInstances => GetAllInstances
* fix github action scan error
* fix: fix go fmt error
* fix: import formatter
* fix: fix cr issue
* fix: fix code-style check error
* fix: fix cr issue
* fix: fix maybe nil error
* fix: fix lose event bug
* style: fix code style
Co-authored-by: springliao <[email protected]>
Co-authored-by: Laurence
<[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Xin.Zh <[email protected]>
Co-authored-by: Mulavar <[email protected]>
Co-authored-by: dongjianhui03 <[email protected]>
Co-authored-by: Xuewei Niu <[email protected]>
Co-authored-by: PhilYue <[email protected]>
Co-authored-by: lizhixin.lzx <[email protected]>
Co-authored-by: sanxun0325 <[email protected]>
Co-authored-by: Xuewei Niu <[email protected]>
Co-authored-by: zhaoyunxing <[email protected]>
Co-authored-by: wangxw <[email protected]>
Co-authored-by: wangxiaowei14227 <[email protected]>
Co-authored-by: 望哥 <[email protected]>
Co-authored-by: LaurenceLiZhixin <[email protected]>
---
common/constant/key.go | 14 ++
go.mod | 2 +-
go.sum | 18 ++
imports/imports.go | 1 +
registry/polaris/core.go | 133 ++++++++++++
registry/polaris/listener.go | 118 +++++++++++
registry/polaris/registry.go | 323 +++++++++++++++++++++++++++++
registry/polaris/service_discovery.go | 368 ++++++++++++++++++++++++++++++++++
registry/polaris/utils.go | 72 +++++++
remoting/polaris/builder.go | 106 ++++++++++
remoting/polaris/builder_test.go | 106 ++++++++++
remoting/polaris/polaris.yaml | 95 +++++++++
12 files changed, 1355 insertions(+), 1 deletion(-)
diff --git a/common/constant/key.go b/common/constant/key.go
index d317979..4c55383 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -241,6 +241,20 @@ const (
)
const (
+ PolarisKey = "polaris"
+ PolarisDefaultRoleType = 3
+ PolarisConfigFilePath = "configPath"
+ PolarisNamespace = "namespace"
+ PolarisServiceToken = "token"
+ PolarisServiceNameSeparator = ":"
+ PolarisDubboPath = "DUBBOPATH"
+ PolarisInstanceID = "polaris.instanceID"
+ PolarisDefaultNamespace = "default"
+ PolarisDubboGroup = "dubbo.group"
+ PolarisClientName = "polaris-client"
+)
+
+const (
FileKey = "file"
)
diff --git a/go.mod b/go.mod
index a6c6f71..f7d77ef 100644
--- a/go.mod
+++ b/go.mod
@@ -34,6 +34,7 @@ require (
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/opentracing/opentracing-go v1.2.0
github.com/pkg/errors v0.9.1
+ github.com/polarismesh/polaris-go v1.0.0
github.com/prometheus/client_golang v1.11.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/stretchr/testify v1.7.0
@@ -45,7 +46,6 @@ require (
go.uber.org/zap v1.19.1
google.golang.org/grpc v1.42.0
google.golang.org/protobuf v1.27.1
- gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0
k8s.io/apimachinery v0.22.4
k8s.io/client-go v0.16.9
diff --git a/go.sum b/go.sum
index f1c105b..0d62983 100644
--- a/go.sum
+++ b/go.sum
@@ -66,6 +66,8 @@ github.com/Workiva/go-datastructures v1.0.52
h1:PLSK6pwn8mYdaoaCZEMsXBpBotr4HHn9
github.com/Workiva/go-datastructures v1.0.52/go.mod
h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
h1:rFw4nCn9iMW+Vajsk51NtYIcwSTkXr+JGrMd36kTDJw=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod
h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
+github.com/agiledragon/gomonkey v0.0.0-20190517145658-8fa491f7b918
h1:a88Ln+jbIokfi6xoKtq10dbgp4VMg1CmHF1J42p8EyE=
+github.com/agiledragon/gomonkey v0.0.0-20190517145658-8fa491f7b918/go.mod
h1:2NGfXu1a80LLr2cmWXGBDaHEjb1idR6+FVlX5T3D9hw=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod
h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@@ -339,6 +341,13 @@ github.com/golang/snappy
v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8l
github.com/golang/snappy v0.0.1/go.mod
h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod
h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/gonum/blas v0.0.0-20181208220705-f22b278b28ac/go.mod
h1:P32wAyui1PQ58Oce/KYkOqQv8cVw1zAapXOl+dRFGbc=
+github.com/gonum/floats v0.0.0-20181209220543-c233463c7e82/go.mod
h1:PxC8OnwL11+aosOB5+iEPoV3picfs8tUpkVd0pDo+Kg=
+github.com/gonum/integrate v0.0.0-20181209220457-a422b5c0fdf2/go.mod
h1:pDgmNM6seYpwvPos3q+zxlXMsbve6mOIPucUnUOrI7Y=
+github.com/gonum/internal v0.0.0-20181124074243-f884aa714029/go.mod
h1:Pu4dmpkhSyOzRwuXkOgAvijx4o+4YMUJJo9OvPYMkks=
+github.com/gonum/lapack v0.0.0-20181123203213-e4cdc5a0bff9/go.mod
h1:XA3DeT6rxh2EAE789SSiSJNqxPaC0aE9J8NTOI0Jo/A=
+github.com/gonum/matrix v0.0.0-20181209220409-c518dec07be9/go.mod
h1:0EXg4mc1CNP0HCqCz+K4ts155PXIlUywf0wqN+GfPZw=
+github.com/gonum/stat v0.0.0-20181125101827-41a0da705a5b/go.mod
h1:Z4GIJBJO3Wa4gD4vbwQxXXZ+WHmW6E9ixmNrwvs0iZs=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod
h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo=
github.com/google/btree v1.0.0/go.mod
h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
@@ -420,6 +429,7 @@ github.com/hashicorp/go-immutable-radix v1.3.1/go.mod
h1:0y9vanUI8NX6FsYoO3zeMjh
github.com/hashicorp/go-kms-wrapping/entropy v0.1.0/go.mod
h1:d1g9WGtAunDNpek8jUIEJnBlbgKS1N2Q61QkHiZyR1g=
github.com/hashicorp/go-msgpack v0.5.3/go.mod
h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0/go.mod
h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
+github.com/hashicorp/go-multierror v1.1.1
h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod
h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-plugin v1.0.1/go.mod
h1:++UyYGoz3o5w9ZzAdZxtQKrWWP+iqPBn3cQptSMzBuY=
github.com/hashicorp/go-plugin v1.4.3/go.mod
h1:5fGEH17QVwTTcR0zV7yhDPLLmFX9YSZ38b18Udy6vYQ=
@@ -555,6 +565,7 @@ github.com/mitchellh/copystructure v1.0.0/go.mod
h1:SNtv71yrdKgLRyLFxmLdkAbkKEFW
github.com/mitchellh/copystructure v1.2.0
h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod
h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
github.com/mitchellh/go-homedir v1.0.0/go.mod
h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
+github.com/mitchellh/go-homedir v1.1.0
h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod
h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface
v0.0.0-20171004221916-a61a99592b77/go.mod
h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod
h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
@@ -648,6 +659,8 @@ github.com/pkg/profile v1.2.1/go.mod
h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6J
github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.0
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/polarismesh/polaris-go v1.0.0
h1:JIBANM5nfhu5knbg269kldQ58bSSV7a6AzTQk1OZwt8=
+github.com/polarismesh/polaris-go v1.0.0/go.mod
h1:uzNFDShCN+UhBncwwNqNVhPpI1ZXYwPlb9N/aE+/vE0=
github.com/posener/complete v1.1.1/go.mod
h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/prometheus/client_golang v0.9.1/go.mod
h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang
v0.9.3-0.20190127221311-3c4408c8b829/go.mod
h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
@@ -720,6 +733,7 @@ github.com/sirupsen/logrus v1.7.0/go.mod
h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d
h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod
h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod
h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
+github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337/go.mod
h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/goconvey v1.6.4
h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod
h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4/go.mod
h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
@@ -727,6 +741,8 @@ github.com/soheilhy/cmux
v0.1.5-0.20210205191134-5ec6847320e5 h1:GJTW+uNMIV1RKwo
github.com/soheilhy/cmux v0.1.5-0.20210205191134-5ec6847320e5/go.mod
h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
github.com/sony/gobreaker v0.4.1/go.mod
h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod
h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
+github.com/spaolacci/murmur3 v1.1.0
h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
+github.com/spaolacci/murmur3 v1.1.0/go.mod
h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod
h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc=
github.com/spf13/afero v1.2.2/go.mod
h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
@@ -836,6 +852,7 @@ go.uber.org/atomic v1.9.0/go.mod
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723
h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod
h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod
h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/multierr v1.2.0/go.mod
h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod
h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod
h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
@@ -932,6 +949,7 @@ golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod
h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200320220750-118fecf932d8/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200421231249-e086a090c8fd/go.mod
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
diff --git a/imports/imports.go b/imports/imports.go
index a71092e..fb66f0d 100644
--- a/imports/imports.go
+++ b/imports/imports.go
@@ -65,6 +65,7 @@ import (
_ "dubbo.apache.org/dubbo-go/v3/protocol/rest"
_ "dubbo.apache.org/dubbo-go/v3/registry/etcdv3"
_ "dubbo.apache.org/dubbo-go/v3/registry/nacos"
+ _ "dubbo.apache.org/dubbo-go/v3/registry/polaris"
_ "dubbo.apache.org/dubbo-go/v3/registry/protocol"
_ "dubbo.apache.org/dubbo-go/v3/registry/servicediscovery"
_ "dubbo.apache.org/dubbo-go/v3/registry/zookeeper"
diff --git a/registry/polaris/core.go b/registry/polaris/core.go
new file mode 100644
index 0000000..d505d14
--- /dev/null
+++ b/registry/polaris/core.go
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package polaris
+
+import (
+ "sync"
+ "time"
+)
+
+import (
+ "github.com/polarismesh/polaris-go/api"
+ "github.com/polarismesh/polaris-go/pkg/model"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/config_center"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+)
+
+type subscriber func(remoting.EventType, []model.Instance)
+
+// PolarisServiceWatcher
+type PolarisServiceWatcher struct {
+ consumer api.ConsumerAPI
+ subscribeParam *api.WatchServiceRequest
+ lock *sync.RWMutex
+ subscribers []subscriber
+ execOnce *sync.Once
+}
+
+// newPolarisWatcher create PolarisServiceWatcher to do watch service action
+func newPolarisWatcher(param *api.WatchServiceRequest, consumer
api.ConsumerAPI) (*PolarisServiceWatcher, error) {
+ watcher := &PolarisServiceWatcher{
+ subscribeParam: param,
+ consumer: consumer,
+ lock: &sync.RWMutex{},
+ subscribers: make([]subscriber, 0),
+ execOnce: &sync.Once{},
+ }
+ return watcher, nil
+}
+
+// AddSubscriber add subscriber into watcher's subscribers
+func (watcher *PolarisServiceWatcher) AddSubscriber(subscriber
func(remoting.EventType, []model.Instance)) {
+
+ watcher.lazyRun()
+
+ watcher.lock.Lock()
+ defer watcher.lock.Unlock()
+
+ watcher.subscribers = append(watcher.subscribers, subscriber)
+}
+
+// lazyRun Delayed execution, only triggered when AddSubscriber is called, and
will only be executed once
+func (watcher *PolarisServiceWatcher) lazyRun() {
+ watcher.execOnce.Do(func() {
+ go watcher.startWatch()
+ })
+}
+
+// startWatch start run work to watch target service by polaris
+func (watcher *PolarisServiceWatcher) startWatch() {
+
+ for {
+ resp, err :=
watcher.consumer.WatchService(watcher.subscribeParam)
+ if err != nil {
+ time.Sleep(time.Duration(500 * time.Millisecond))
+ continue
+ }
+
+ watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
+ Value: resp.GetAllInstancesResp.Instances,
+ ConfigType: remoting.EventTypeAdd,
+ })
+
+ select {
+ case event := <-resp.EventChannel:
+ eType := event.GetSubScribeEventType()
+ if eType == api.EventInstance {
+ insEvent := event.(*model.InstanceEvent)
+ if insEvent.AddEvent != nil {
+
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
+ Value:
insEvent.AddEvent.Instances,
+ ConfigType:
remoting.EventTypeAdd,
+ })
+ }
+ if insEvent.UpdateEvent != nil {
+ instances := make([]model.Instance,
len(insEvent.UpdateEvent.UpdateList))
+ for i := range
insEvent.UpdateEvent.UpdateList {
+ instances[i] =
insEvent.UpdateEvent.UpdateList[i].After
+ }
+
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
+ Value: instances,
+ ConfigType:
remoting.EventTypeUpdate,
+ })
+ }
+ if insEvent.DeleteEvent != nil {
+
watcher.notifyAllSubscriber(&config_center.ConfigChangeEvent{
+ Value:
insEvent.DeleteEvent.Instances,
+ ConfigType:
remoting.EventTypeDel,
+ })
+ }
+ }
+ }
+ }
+}
+
+// notifyAllSubscriber notify config_center.ConfigChangeEvent to all subscriber
+func (watcher *PolarisServiceWatcher) notifyAllSubscriber(event
*config_center.ConfigChangeEvent) {
+ watcher.lock.RLock()
+ defer watcher.lock.RUnlock()
+
+ for i := 0; i < len(watcher.subscribers); i++ {
+ subscriber := watcher.subscribers[i]
+ subscriber(event.ConfigType, event.Value.([]model.Instance))
+ }
+
+}
diff --git a/registry/polaris/listener.go b/registry/polaris/listener.go
new file mode 100644
index 0000000..4da78ca
--- /dev/null
+++ b/registry/polaris/listener.go
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package polaris
+
+import (
+ "bytes"
+ "net/url"
+ "strconv"
+)
+
+import (
+ gxchan "github.com/dubbogo/gost/container/chan"
+
+ perrors "github.com/pkg/errors"
+
+ "github.com/polarismesh/polaris-go/pkg/model"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/logger"
+ "dubbo.apache.org/dubbo-go/v3/config_center"
+ "dubbo.apache.org/dubbo-go/v3/registry"
+)
+
+type polarisListener struct {
+ watcher *PolarisServiceWatcher
+ listenUrl *common.URL
+ events *gxchan.UnboundedChan
+ closeCh chan struct{}
+}
+
+// NewPolarisListener new polaris listener
+func NewPolarisListener(url *common.URL) (*polarisListener, error) {
+ listener := &polarisListener{
+ listenUrl: url,
+ events: gxchan.NewUnboundedChan(32),
+ closeCh: make(chan struct{}),
+ }
+ return listener, nil
+}
+
+// Next returns next service event once received
+func (pl *polarisListener) Next() (*registry.ServiceEvent, error) {
+ for {
+ select {
+ case <-pl.closeCh:
+ logger.Warnf("polaris listener is close!listenUrl:%+v",
pl.listenUrl)
+ return nil, perrors.New("listener stopped")
+ case val := <-pl.events.Out():
+ e, _ := val.(*config_center.ConfigChangeEvent)
+ logger.Debugf("got polaris event %s", e)
+ instance := e.Value.(model.Instance)
+ return ®istry.ServiceEvent{Action: e.ConfigType,
Service: generateUrl(instance)}, nil
+ }
+ }
+}
+
+// Close closes this listener
+func (pl *polarisListener) Close() {
+ // TODO need to add UnWatch in polaris
+ close(pl.closeCh)
+}
+
+func getSubscribeName(url *common.URL) string {
+ var buffer bytes.Buffer
+ buffer.Write([]byte(common.DubboNodes[common.PROVIDER]))
+ appendParam(&buffer, url, constant.InterfaceKey)
+ return buffer.String()
+}
+
+func generateUrl(instance model.Instance) *common.URL {
+ if instance.GetMetadata() == nil {
+ logger.Errorf("polaris instance metadata is
empty,instance:%+v", instance)
+ return nil
+ }
+ path := instance.GetMetadata()["path"]
+ myInterface := instance.GetMetadata()["interface"]
+ if len(path) == 0 && len(myInterface) == 0 {
+ logger.Errorf("polaris instance metadata does not have both
path key and interface key,instance:%+v", instance)
+ return nil
+ }
+ if len(path) == 0 && len(myInterface) != 0 {
+ path = "/" + myInterface
+ }
+ protocol := instance.GetProtocol()
+ if len(protocol) == 0 {
+ logger.Errorf("polaris instance metadata does not have protocol
key,instance:%+v", instance)
+ return nil
+ }
+ urlMap := url.Values{}
+ for k, v := range instance.GetMetadata() {
+ urlMap.Set(k, v)
+ }
+ return common.NewURLWithOptions(
+ common.WithIp(instance.GetHost()),
+ common.WithPort(strconv.Itoa(int(instance.GetPort()))),
+ common.WithProtocol(protocol),
+ common.WithParams(urlMap),
+ common.WithPath(path),
+ )
+}
diff --git a/registry/polaris/registry.go b/registry/polaris/registry.go
new file mode 100644
index 0000000..d95ea8e
--- /dev/null
+++ b/registry/polaris/registry.go
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package polaris
+
+import (
+ "context"
+ "strconv"
+ "sync"
+ "time"
+)
+
+import (
+ perrors "github.com/pkg/errors"
+
+ "github.com/polarismesh/polaris-go/api"
+ "github.com/polarismesh/polaris-go/pkg/model"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/extension"
+ "dubbo.apache.org/dubbo-go/v3/common/logger"
+ "dubbo.apache.org/dubbo-go/v3/config_center"
+ "dubbo.apache.org/dubbo-go/v3/registry"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+ "dubbo.apache.org/dubbo-go/v3/remoting/polaris"
+)
+
+var localIP = ""
+
+const (
+ // RegistryConnDelay registry connection delay
+ RegistryConnDelay = 3
+)
+
+func init() {
+ localIP = common.GetLocalIp()
+ extension.SetRegistry(constant.PolarisKey, newPolarisRegistry)
+}
+
+// newPolarisRegistry will create new instance
+func newPolarisRegistry(url *common.URL) (registry.Registry, error) {
+ sdkCtx, _, err := polaris.GetPolarisConfig(url)
+ if err != nil {
+ return &polarisRegistry{}, err
+ }
+ pRegistry := &polarisRegistry{
+ consumer: api.NewConsumerAPIByContext(sdkCtx),
+ provider: api.NewProviderAPIByContext(sdkCtx),
+ lock: &sync.RWMutex{},
+ registryUrls: make(map[string]*PolarisHeartbeat),
+ listenerLock: &sync.RWMutex{},
+ watchers: make(map[string]*PolarisServiceWatcher),
+ }
+
+ return pRegistry, nil
+}
+
+type polarisRegistry struct {
+ url *common.URL
+ consumer api.ConsumerAPI
+ provider api.ProviderAPI
+ lock *sync.RWMutex
+ registryUrls map[string]*PolarisHeartbeat
+
+ listenerLock *sync.RWMutex
+ watchers map[string]*PolarisServiceWatcher
+}
+
+// Register
+func (pr *polarisRegistry) Register(url *common.URL) error {
+ serviceName := getServiceName(url)
+ param := createRegisterParam(url, serviceName)
+ resp, err := pr.provider.Register(param)
+ if err != nil {
+ return err
+ }
+
+ if resp.Existed {
+ logger.Warnf("instance already regist, namespace:%+v,
service:%+v, host:%+v, port:%+v",
+ param.Namespace, param.Service, param.Host, param.Port)
+ }
+
+ pr.lock.Lock()
+ defer pr.lock.Unlock()
+
+ url.SetParam(constant.PolarisInstanceID, resp.InstanceID)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ go pr.doHeartbeat(ctx, param)
+
+ pr.registryUrls[url.Key()] = &PolarisHeartbeat{
+ url: url,
+ cancel: cancel,
+ }
+ return nil
+}
+
+// UnRegister
+func (pr *polarisRegistry) UnRegister(conf *common.URL) error {
+ var (
+ ok bool
+ err error
+ oldVal *PolarisHeartbeat
+ )
+
+ func() {
+ pr.lock.Lock()
+ defer pr.lock.Unlock()
+
+ oldVal, ok = pr.registryUrls[conf.Key()]
+
+ if !ok {
+ err = perrors.Errorf("Path{%s} has not registered",
conf.Key())
+ return
+ }
+
+ oldVal.cancel()
+ delete(pr.registryUrls, oldVal.url.Key())
+ }()
+
+ if err != nil {
+ return err
+ }
+
+ request := createDeregisterParam(conf, getServiceName(conf))
+
+ err = pr.provider.Deregister(request)
+ if err != nil {
+ func() {
+ pr.lock.Lock()
+ defer pr.lock.Unlock()
+ pr.registryUrls[conf.Key()] = oldVal
+ }()
+ return perrors.WithMessagef(err, "register(conf:%+v)", conf)
+ }
+ return nil
+}
+
+// Subscribe
+func (pr *polarisRegistry) Subscribe(url *common.URL, notifyListener
registry.NotifyListener) error {
+ role, _ := strconv.Atoi(url.GetParam(constant.RegistryRoleKey, ""))
+ if role != common.CONSUMER {
+ return nil
+ }
+
+ watcher, err := pr.createPolarisWatcherIfAbsent(url)
+
+ if err != nil {
+ return err
+ }
+
+ for {
+ listener, err := NewPolarisListener(url)
+ if err != nil {
+ logger.Warnf("getListener() = err:%v",
perrors.WithStack(err))
+ <-time.After(time.Duration(RegistryConnDelay) *
time.Second)
+ continue
+ }
+
+ watcher.AddSubscriber(func(et remoting.EventType, instances
[]model.Instance) {
+ for i := range instances {
+ instance := instances[i]
+ listener.events.In() <-
&config_center.ConfigChangeEvent{ConfigType: et, Value: instance}
+ }
+ })
+
+ for {
+ serviceEvent, err := listener.Next()
+ if err != nil {
+ logger.Warnf("Selector.watch() = error{%v}",
perrors.WithStack(err))
+ listener.Close()
+ return err
+ }
+ logger.Infof("update begin, service event: %v",
serviceEvent.String())
+ notifyListener.Notify(serviceEvent)
+ }
+ }
+}
+
+func (pr *polarisRegistry) createPolarisWatcherIfAbsent(url *common.URL)
(*PolarisServiceWatcher, error) {
+
+ pr.listenerLock.Lock()
+ defer pr.listenerLock.Unlock()
+
+ serviceName := getSubscribeName(url)
+
+ if _, exist := pr.watchers[serviceName]; !exist {
+ subscribeParam := &api.WatchServiceRequest{
+ WatchServiceRequest: model.WatchServiceRequest{
+ Key: model.ServiceKey{
+ Namespace:
url.GetParam(constant.PolarisNamespace, constant.PolarisDefaultNamespace),
+ Service: serviceName,
+ },
+ },
+ }
+
+ watcher, err := newPolarisWatcher(subscribeParam, pr.consumer)
+ if err != nil {
+ return nil, err
+ }
+ pr.watchers[serviceName] = watcher
+ }
+
+ return pr.watchers[serviceName], nil
+}
+
+// UnSubscribe
+func (pr *polarisRegistry) UnSubscribe(url *common.URL, notifyListener
registry.NotifyListener) error {
+ // TODO wait polaris support it
+ return perrors.New("UnSubscribe not support in polarisRegistry")
+}
+
+// GetURL
+func (pr *polarisRegistry) GetURL() *common.URL {
+ return pr.url
+}
+
+// Destroy
+func (pr *polarisRegistry) Destroy() {
+ for _, val := range pr.registryUrls {
+ val.cancel()
+ err := pr.UnRegister(val.url)
+ logger.Infof("DeRegister Polaris URL:%+v", val.url)
+ if err != nil {
+ logger.Errorf("Deregister URL:%+v err:%v", val.url,
err.Error())
+ }
+ }
+ return
+}
+
+// IsAvailable always return true when use polaris
+func (pr *polarisRegistry) IsAvailable() bool {
+ return true
+}
+
+// doHeartbeat Since polaris does not support automatic reporting of instance
heartbeats, separate logic is
+// needed to implement it
+func (pr *polarisRegistry) doHeartbeat(ctx context.Context, ins
*api.InstanceRegisterRequest) {
+ ticker := time.NewTicker(time.Duration(4) * time.Second)
+
+ heartbeat := &api.InstanceHeartbeatRequest{
+ InstanceHeartbeatRequest: model.InstanceHeartbeatRequest{
+ Service: ins.Service,
+ Namespace: ins.Namespace,
+ Host: ins.Host,
+ Port: ins.Port,
+ },
+ }
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ pr.provider.Heartbeat(heartbeat)
+ }
+ }
+}
+
+// createRegisterParam convert dubbo url to polaris instance register request
+func createRegisterParam(url *common.URL, serviceName string)
*api.InstanceRegisterRequest {
+ if len(url.Ip) == 0 {
+ url.Ip = localIP
+ }
+ if len(url.Port) == 0 || url.Port == "0" {
+ url.Port = "80"
+ }
+ port, _ := strconv.Atoi(url.Port)
+
+ metadata := make(map[string]string, len(url.GetParams()))
+ url.RangeParams(func(key, value string) bool {
+ metadata[key] = value
+ return true
+ })
+ metadata[constant.PolarisDubboPath] = url.Path
+
+ return &api.InstanceRegisterRequest{
+ InstanceRegisterRequest: model.InstanceRegisterRequest{
+ Service: serviceName,
+ Namespace: url.GetParam(constant.PolarisNamespace,
constant.PolarisDefaultNamespace),
+ Host: url.Ip,
+ Port: port,
+ Protocol: &protocolForDubboGO,
+ Metadata: metadata,
+ },
+ }
+}
+
+// createDeregisterParam convert dubbo url to polaris instance deregister
request
+func createDeregisterParam(url *common.URL, serviceName string)
*api.InstanceDeRegisterRequest {
+ if len(url.Ip) == 0 {
+ url.Ip = localIP
+ }
+ if len(url.Port) == 0 || url.Port == "0" {
+ url.Port = "80"
+ }
+ port, _ := strconv.Atoi(url.Port)
+ return &api.InstanceDeRegisterRequest{
+ InstanceDeRegisterRequest: model.InstanceDeRegisterRequest{
+ Service: serviceName,
+ Namespace: url.GetParam(constant.PolarisNamespace,
constant.PolarisDefaultNamespace),
+ Host: url.Ip,
+ Port: port,
+ },
+ }
+}
diff --git a/registry/polaris/service_discovery.go
b/registry/polaris/service_discovery.go
new file mode 100644
index 0000000..6ae06c8
--- /dev/null
+++ b/registry/polaris/service_discovery.go
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package polaris
+
+import (
+ "context"
+ "fmt"
+ "net/url"
+ "sync"
+ "time"
+)
+
+import (
+ gxset "github.com/dubbogo/gost/container/set"
+ gxpage "github.com/dubbogo/gost/hash/page"
+
+ perrors "github.com/pkg/errors"
+
+ "github.com/polarismesh/polaris-go/api"
+ "github.com/polarismesh/polaris-go/pkg/model"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/logger"
+ "dubbo.apache.org/dubbo-go/v3/config"
+ "dubbo.apache.org/dubbo-go/v3/registry"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+ "dubbo.apache.org/dubbo-go/v3/remoting/polaris"
+)
+
+// newPolarisServiceDiscovery will create new service discovery instance
+func newPolarisServiceDiscovery() (registry.ServiceDiscovery, error) {
+ metadataReportConfig := config.GetMetadataReportConfg()
+ url := common.NewURLWithOptions(
+ common.WithParams(make(url.Values)),
+ common.WithParamsValue(constant.RegistryTimeoutKey,
metadataReportConfig.Timeout))
+ url.Location = metadataReportConfig.Address
+
+ sdkCtx, namespace, err := polaris.GetPolarisConfig(url)
+
+ if err != nil {
+ return nil, perrors.WithMessage(err, "create polaris
namingClient failed.")
+ }
+
+ descriptor := fmt.Sprintf("polaris-service-discovery[%s]",
metadataReportConfig.Address)
+
+ newInstance := &polarisServiceDiscovery{
+ namespace: namespace,
+ descriptor: descriptor,
+ instanceLock: &sync.RWMutex{},
+ consumer: api.NewConsumerAPIByContext(sdkCtx),
+ provider: api.NewProviderAPIByContext(sdkCtx),
+ registryInstances: make(map[string]*PolarisHeartbeat),
+ listenerLock: &sync.RWMutex{},
+ watchers: make(map[string]*PolarisServiceWatcher),
+ }
+ return newInstance, nil
+}
+
+type polarisServiceDiscovery struct {
+ namespace string
+ descriptor string
+ provider api.ProviderAPI
+ consumer api.ConsumerAPI
+ services *gxset.HashSet
+ instanceLock *sync.RWMutex
+ registryInstances map[string]*PolarisHeartbeat
+ watchers map[string]*PolarisServiceWatcher
+ listenerLock *sync.RWMutex
+}
+
+// Destroy destroy polarisServiceDiscovery, will do unregister all
ServiceInstance
+// and close polaris.ConsumerAPI and polaris.ProviderAPI
+func (polaris *polarisServiceDiscovery) Destroy() error {
+ for _, inst := range polaris.registryInstances {
+
+ inst.cancel()
+
+ err := polaris.Unregister(inst.instance)
+ logger.Infof("Unregister nacos instance:%+v", inst)
+ if err != nil {
+ logger.Errorf("Unregister nacos instance:%+v, err:%+v",
inst, err)
+ }
+ }
+ polaris.provider.Destroy()
+ polaris.consumer.Destroy()
+ return nil
+}
+
+// Register do register for ServiceInstance
+func (polaris *polarisServiceDiscovery) Register(instance
registry.ServiceInstance) error {
+
+ ins := convertToRegisterInstance(polaris.namespace, instance)
+ resp, err := polaris.provider.Register(ins)
+ if err != nil {
+ return perrors.WithMessage(err, "could not register the
instance. "+instance.GetServiceName())
+ }
+
+ if resp.Existed {
+ logger.Warnf("instance already regist, namespace:%+v,
service:%+v, host:%+v, port:%+v",
+ polaris.namespace, instance.GetServiceName(),
instance.GetHost(), instance.GetPort())
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ go polaris.doHeartbeat(ctx, ins)
+
+ polaris.instanceLock.Lock()
+ defer polaris.instanceLock.Unlock()
+
+ polaris.registryInstances[getInstanceKey(polaris.namespace, instance)]
= &PolarisHeartbeat{
+ cancel: cancel,
+ instance: instance,
+ }
+ polaris.services.Add(instance.GetServiceName())
+
+ return nil
+}
+
+// Update update ServiceInstance info
+func (polaris *polarisServiceDiscovery) Update(instance
registry.ServiceInstance) error {
+ err := polaris.Unregister(instance)
+ if err != nil {
+ return perrors.WithStack(err)
+ }
+ polaris.services.Add(instance.GetServiceName())
+ return polaris.Register(instance)
+}
+
+// Unregister do Unregister for ServiceInstance
+func (polaris *polarisServiceDiscovery) Unregister(instance
registry.ServiceInstance) error {
+
+ func() {
+ polaris.instanceLock.Lock()
+ defer polaris.instanceLock.Unlock()
+
+ key := getInstanceKey(polaris.namespace, instance)
+ if heartbeat, exist := polaris.registryInstances[key]; exist {
+ heartbeat.cancel()
+ delete(polaris.registryInstances, key)
+ }
+ }()
+
+ err :=
polaris.provider.Deregister(convertToDeregisterInstance(polaris.namespace,
instance))
+ if err != nil {
+ return perrors.WithMessage(err, "Could not unregister the
instance. "+instance.GetServiceName())
+ }
+
+ polaris.services.Remove(instance.GetServiceName())
+ return nil
+}
+
+// GetDefaultPageSize
+func (polaris *polarisServiceDiscovery) GetDefaultPageSize() int {
+ return registry.DefaultPageSize
+}
+
+// GetServices
+func (polaris *polarisServiceDiscovery) GetServices() *gxset.HashSet {
+ return polaris.services
+}
+
+// GetInstances will return all service instances with serviceName
+func (polaris *polarisServiceDiscovery) GetInstances(serviceName string)
[]registry.ServiceInstance {
+ resp, err :=
polaris.consumer.GetAllInstances(&api.GetAllInstancesRequest{
+ GetAllInstancesRequest: model.GetAllInstancesRequest{
+ Service: serviceName,
+ Namespace: polaris.namespace,
+ },
+ })
+
+ if err != nil {
+ logger.Errorf("Could not query the instances for service: %+v .
It happened err %+v", serviceName, err)
+ return make([]registry.ServiceInstance, 0)
+ }
+ res := make([]registry.ServiceInstance, 0, len(resp.Instances))
+ for _, ins := range resp.Instances {
+ metadata := ins.GetMetadata()
+ res = append(res, ®istry.DefaultServiceInstance{
+ ID: ins.GetId(),
+ ServiceName: serviceName,
+ Host: ins.GetHost(),
+ Port: int(ins.GetPort()),
+ Enable: !ins.IsIsolated(),
+ Healthy: ins.IsHealthy(),
+ Metadata: metadata,
+ })
+ }
+ return res
+}
+
+// GetInstancesByPage will return a page containing instances of
ServiceInstance with the serviceName
+// the page will start at offset
+func (polaris *polarisServiceDiscovery) GetInstancesByPage(serviceName string,
offset int, pageSize int) gxpage.Pager {
+ all := polaris.GetInstances(serviceName)
+ res := make([]interface{}, 0, pageSize)
+ for i := offset; i < len(all) && i < offset+pageSize; i++ {
+ res = append(res, all[i])
+ }
+ return gxpage.NewPage(offset, pageSize, res, len(all))
+}
+
+// GetHealthyInstancesByPage will return a page containing instances of
ServiceInstance.
+// The param healthy indices that the instance should be healthy or not.
+// The page will start at offset
+func (polaris *polarisServiceDiscovery) GetHealthyInstancesByPage(serviceName
string, offset int, pageSize int, healthy bool) gxpage.Pager {
+ all := polaris.GetInstances(serviceName)
+ res := make([]interface{}, 0, pageSize)
+ // could not use res = all[a:b] here because the res should be
[]interface{}, not []ServiceInstance
+ var (
+ i = offset
+ count = 0
+ )
+ for i < len(all) && count < pageSize {
+ ins := all[i]
+ if ins.IsHealthy() == healthy {
+ res = append(res, all[i])
+ count++
+ }
+ i++
+ }
+ return gxpage.NewPage(offset, pageSize, res, len(all))
+}
+
+// Batch get all instances by the specified service names
+func (polaris *polarisServiceDiscovery) GetRequestInstances(serviceNames
[]string, offset int, requestedSize int) map[string]gxpage.Pager {
+ res := make(map[string]gxpage.Pager, len(serviceNames))
+ for _, name := range serviceNames {
+ res[name] = polaris.GetInstancesByPage(name, offset,
requestedSize)
+ }
+ return res
+}
+
+// AddListener add listener for ServiceInstancesChangedListener
+func (polaris *polarisServiceDiscovery) AddListener(listener
registry.ServiceInstancesChangedListener) error {
+
+ for _, val := range listener.GetServiceNames().Values() {
+ serviceName := val.(string)
+ watcher, err :=
polaris.createPolarisWatcherIfAbsent(serviceName)
+ if err != nil {
+ return err
+ }
+
+ watcher.AddSubscriber(func(et remoting.EventType, instances
[]model.Instance) {
+ dubboInstances := make([]registry.ServiceInstance, 0,
len(instances))
+ for _, instance := range instances {
+ dubboInstances = append(dubboInstances,
®istry.DefaultServiceInstance{
+ ID: instance.GetId(),
+ ServiceName: instance.GetService(),
+ Host: instance.GetHost(),
+ Port: int(instance.GetPort()),
+ Enable: !instance.IsIsolated(),
+ Healthy: instance.IsHealthy(),
+ Metadata: instance.GetMetadata(),
+ GroupName:
instance.GetMetadata()[constant.PolarisDubboGroup],
+ })
+ }
+
+
listener.OnEvent(registry.NewServiceInstancesChangedEvent(serviceName,
dubboInstances))
+ })
+ }
+
+ return nil
+}
+
+// createPolarisWatcherIfAbsent Calculate whether the corresponding
PolarisWatcher needs to be created,
+// if it does not exist, create one, otherwise return the existing one
+func (polaris *polarisServiceDiscovery)
createPolarisWatcherIfAbsent(serviceName string) (*PolarisServiceWatcher,
error) {
+
+ polaris.listenerLock.Lock()
+ defer polaris.listenerLock.Unlock()
+
+ if _, exist := polaris.watchers[serviceName]; !exist {
+ subscribeParam := &api.WatchServiceRequest{
+ WatchServiceRequest: model.WatchServiceRequest{
+ Key: model.ServiceKey{
+ Namespace: polaris.namespace,
+ Service: serviceName,
+ },
+ },
+ }
+
+ watcher, err := newPolarisWatcher(subscribeParam,
polaris.consumer)
+ if err != nil {
+ return nil, err
+ }
+ polaris.watchers[serviceName] = watcher
+ }
+
+ return polaris.watchers[serviceName], nil
+}
+
+// String retuen descriptor
+func (polaris *polarisServiceDiscovery) String() string {
+ return polaris.descriptor
+}
+
+func convertToRegisterInstance(namespace string, instance
registry.ServiceInstance) *api.InstanceRegisterRequest {
+
+ health := instance.IsHealthy()
+ isolate := instance.IsEnable()
+ ttl := 5
+
+ return &api.InstanceRegisterRequest{
+ InstanceRegisterRequest: model.InstanceRegisterRequest{
+ Service: instance.GetServiceName(),
+ Namespace: namespace,
+ Host: instance.GetHost(),
+ Port: instance.GetPort(),
+ Protocol: &protocolForDubboGO,
+ Metadata: instance.GetMetadata(),
+ Healthy: &health,
+ Isolate: &isolate,
+ TTL: &ttl,
+ },
+ }
+}
+
+func convertToDeregisterInstance(namespace string, instance
registry.ServiceInstance) *api.InstanceDeRegisterRequest {
+ return &api.InstanceDeRegisterRequest{
+ InstanceDeRegisterRequest: model.InstanceDeRegisterRequest{
+ Service: instance.GetServiceName(),
+ Namespace: namespace,
+ Host: instance.GetHost(),
+ Port: instance.GetPort(),
+ },
+ }
+}
+
+// doHeartbeat Since polaris does not support automatic reporting of instance
heartbeats, separate logic is
+// needed to implement it
+func (polaris *polarisServiceDiscovery) doHeartbeat(ctx context.Context, ins
*api.InstanceRegisterRequest) {
+ ticker := time.NewTicker(time.Duration(4) * time.Second)
+
+ heartbeat := &api.InstanceHeartbeatRequest{
+ InstanceHeartbeatRequest: model.InstanceHeartbeatRequest{
+ Service: ins.Service,
+ Namespace: ins.Namespace,
+ Host: ins.Host,
+ Port: ins.Port,
+ },
+ }
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ polaris.provider.Heartbeat(heartbeat)
+ }
+ }
+}
diff --git a/registry/polaris/utils.go b/registry/polaris/utils.go
new file mode 100644
index 0000000..3295b61
--- /dev/null
+++ b/registry/polaris/utils.go
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package polaris
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "strconv"
+ "strings"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/registry"
+)
+
+var (
+ protocolForDubboGO string = "dubbo"
+)
+
+// Polaris's heartbeat mission
+type PolarisHeartbeat struct {
+ cancel context.CancelFunc
+ instance registry.ServiceInstance
+ url *common.URL
+}
+
+func getInstanceKey(namespace string, instance registry.ServiceInstance)
string {
+ return fmt.Sprintf("%s-%s-%s-%d", namespace, instance.GetServiceName(),
instance.GetHost(), instance.GetPort())
+}
+
+// just copy from dubbo-go for nacos
+func getCategory(url *common.URL) string {
+ role, _ := strconv.Atoi(url.GetParam(constant.RegistryRoleKey,
strconv.Itoa(constant.PolarisDefaultRoleType)))
+ category := common.DubboNodes[role]
+ return category
+}
+
+// just copy from dubbo-go for nacos
+func getServiceName(url *common.URL) string {
+ var buffer bytes.Buffer
+
+ buffer.Write([]byte(getCategory(url)))
+ appendParam(&buffer, url, constant.InterfaceKey)
+ return buffer.String()
+}
+
+// just copy from dubbo-go for nacos
+func appendParam(target *bytes.Buffer, url *common.URL, key string) {
+ value := url.GetParam(key, "")
+ target.Write([]byte(constant.PolarisServiceNameSeparator))
+ if strings.TrimSpace(value) != "" {
+ target.Write([]byte(value))
+ }
+}
diff --git a/remoting/polaris/builder.go b/remoting/polaris/builder.go
new file mode 100644
index 0000000..18f9e27
--- /dev/null
+++ b/remoting/polaris/builder.go
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package polaris
+
+import (
+ "errors"
+ "fmt"
+ "net"
+ "strconv"
+ "strings"
+)
+
+import (
+ perrors "github.com/pkg/errors"
+
+ "github.com/polarismesh/polaris-go/api"
+ "github.com/polarismesh/polaris-go/pkg/config"
+ "github.com/polarismesh/polaris-go/pkg/model"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+)
+
+// GetPolarisConfig get polaris config from dubbo url
+func GetPolarisConfig(url *common.URL) (api.SDKContext, string, error) {
+ if url == nil {
+ return nil, "", errors.New("url is empty!")
+ }
+
+ addresses := strings.Split(url.Location, ",")
+ serverConfigs := make([]string, 0, len(addresses))
+ for _, addr := range addresses {
+ ip, portStr, err := net.SplitHostPort(addr)
+ if err != nil {
+ return nil, "", perrors.WithMessagef(err, "split [%s]
", addr)
+ }
+ port, _ := strconv.Atoi(portStr)
+ serverConfigs = append(serverConfigs, fmt.Sprintf("%s:%d", ip,
uint64(port)))
+ }
+
+ polarisConf := config.NewDefaultConfiguration(serverConfigs)
+
+ confPath := url.GetParam(constant.PolarisConfigFilePath, "")
+ if confPath != "" && model.IsFile(confPath) {
+ complexConf, err := config.LoadConfigurationByFile(confPath)
+ if err != nil {
+ return nil, "", err
+ }
+ mergePolarisConfiguration(polarisConf, complexConf)
+
+ polarisConf = complexConf
+ }
+
+ sdkCtx, err := api.InitContextByConfig(polarisConf)
+ if err != nil {
+ return nil, "", err
+ }
+
+ return sdkCtx, url.GetParam(constant.PolarisNamespace,
constant.PolarisDefaultNamespace), nil
+}
+
+// mergePolarisConfiguration
+// @param easy
+// @param complexConf
+func mergePolarisConfiguration(easy, complexConf config.Configuration) {
+
+ easySvrList := easy.GetGlobal().GetServerConnector().GetAddresses()
+
+ complexSvrList :=
complexConf.GetGlobal().GetServerConnector().GetAddresses()
+
+ result := make(map[string]bool)
+
+ for i := range complexSvrList {
+ result[complexSvrList[i]] = true
+ }
+
+ for i := range easySvrList {
+ if _, exist := result[easySvrList[i]]; !exist {
+ result[easySvrList[i]] = true
+ }
+ }
+
+ finalSvrList := make([]string, 0)
+ for k := range result {
+ finalSvrList = append(finalSvrList, k)
+ }
+
+ complexConf.GetGlobal().GetServerConnector().SetAddresses(finalSvrList)
+}
diff --git a/remoting/polaris/builder_test.go b/remoting/polaris/builder_test.go
new file mode 100644
index 0000000..2cf4898
--- /dev/null
+++ b/remoting/polaris/builder_test.go
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package polaris
+
+import (
+ "net/url"
+ "testing"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/config"
+)
+
+func TestGetPolarisConfig(t *testing.T) {
+
+ rc := &config.RemoteConfig{}
+ rc.Params = make(map[string]string)
+
+ rc.Protocol = "polaris"
+ rc.Address = "127.0.0.1:8091"
+
+ rc.Params[constant.PolarisNamespace] = "default"
+
+ url, err := rc.ToURL()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sdkCtx, namespace, err := GetPolarisConfig(url)
+
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NotNil(t, sdkCtx, "SDKContext")
+
+ assert.Equal(t, "default", namespace, "namespace")
+ assert.ElementsMatch(t, []string{"127.0.0.1:8091"},
sdkCtx.GetConfig().GetGlobal().GetServerConnector().GetAddresses(), "server
address")
+}
+
+func TestGetPolarisConfigWithExternalFile(t *testing.T) {
+
+ rc := &config.RemoteConfig{}
+ rc.Params = make(map[string]string)
+
+ rc.Protocol = "polaris"
+ rc.Address = "127.0.0.1:8091"
+
+ rc.Params[constant.PolarisNamespace] = "default"
+ rc.Params[constant.PolarisConfigFilePath] = "./polaris.yaml"
+
+ url, err := rc.ToURL()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sdkCtx, namespace, err := GetPolarisConfig(url)
+
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.NotNil(t, sdkCtx, "SDKContext")
+
+ assert.Equal(t, "default", namespace, "namespace")
+ assert.ElementsMatch(t, []string{"127.0.0.1:8091", "127.0.0.2:8091"},
sdkCtx.GetConfig().GetGlobal().GetServerConnector().GetAddresses(), "server
address")
+}
+
+func TestGetPolarisConfigByUrl(t *testing.T) {
+ regurl := getRegUrl()
+ sdkCtx, namespace, err := GetPolarisConfig(regurl)
+
+ assert.Nil(t, err)
+ assert.Equal(t, "default", namespace, "namespace")
+ assert.ElementsMatch(t, []string{"127.0.0.1:8091", "127.0.0.2:8091"},
sdkCtx.GetConfig().GetGlobal().GetServerConnector().GetAddresses(), "server
address")
+}
+
+func getRegUrl() *common.URL {
+
+ regurlMap := url.Values{}
+ regurlMap.Set(constant.PolarisNamespace, "default")
+ regurlMap.Set(constant.PolarisConfigFilePath, "./polaris.yaml")
+
+ regurl, _ := common.NewURL("registry://127.0.0.1:8091",
common.WithParams(regurlMap))
+
+ return regurl
+}
diff --git a/remoting/polaris/polaris.yaml b/remoting/polaris/polaris.yaml
new file mode 100644
index 0000000..98741f2
--- /dev/null
+++ b/remoting/polaris/polaris.yaml
@@ -0,0 +1,95 @@
+global:
+ system:
+ mode: 0
+ discoverCluster:
+ namespace: Polaris
+ service: polaris.discover
+ refreshInterval: 10m
+ healthCheckCluster:
+ namespace: Polaris
+ service: polaris.healthcheck
+ refreshInterval: 10m
+ monitorCluster:
+ namespace: Polaris
+ service: polaris.monitor
+ refreshInterval: 10m
+ api:
+ timeout: 1s
+ reportInterval: 10m
+ maxRetryTimes: 5
+ retryInterval: 1s
+ serverConnector:
+ addresses:
+ - 127.0.0.2:8091
+ protocol: grpc
+ connectTimeout: 500ms
+ messageTimeout: 1s
+ connectionIdleTimeout: 1s
+ requestQueueSize: 1000
+ serverSwitchInterval: 10m
+ plugin:
+ grpc:
+ maxCallRecvMsgSize: 52428800
+ statReporter:
+ enable: true
+ chain:
+ - stat2Monitor
+ - serviceCache
+ plugin:
+ stat2Monitor:
+ metricsReportWindow: 1m
+ metricsNumBuckets: 12
+ serviceCache:
+ reportInterval: 3m
+consumer:
+ localCache:
+ type: inmemory
+ serviceExpireTime: 24h
+ serviceRefreshInterval: 2s
+ persistDir: $HOME/polaris/backup
+ persistMaxWriteRetry: 5
+ persistMaxReadRetry: 1
+ persistRetryInterval: 1s
+ persistAvailableInterval: 60s
+ startUseFileCache: true
+ serviceRouter:
+ chain:
+ - ruleBasedRouter
+ - nearbyBasedRouter
+ plugin:
+ nearbyBasedRouter:
+ matchLevel: zone
+ ruleBasedRouter: {}
+ percentOfMinInstances: 0
+ enableRecoverAll: true
+ loadbalancer:
+ type: weightedRandom
+ plugin:
+ ringHash:
+ vnodeCount: 500
+ circuitBreaker:
+ enable: true
+ checkPeriod: 30s
+ requestCountAfterHalfOpen: 10
+ sleepWindow: 30s
+ successCountAfterHalfOpen: 8
+ recoverWindow: 60s
+ recoverNumBuckets: 10
+ chain:
+ - errorCount
+ - errorRate
+ plugin:
+ errorCount:
+ continuousErrorThreshold: 10
+ metricNumBuckets: 10
+ metricStatTimeWindow: 1m0s
+ errorRate:
+ errorRateThreshold: 0.5
+ metricNumBuckets: 5
+ metricStatTimeWindow: 1m0s
+ requestVolumeThreshold: 10
+ subscribe:
+ type: subscribeLocalChannel
+ plugin:
+ subscribeLocalChannel:
+ channelBufferSize: 50