This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch refactor-with-go
in repository https://gitbox.apache.org/repos/asf/dubbo-admin.git
The following commit(s) were added to refs/heads/refactor-with-go by this push:
new d29ff753 Prometheus http_sd discovery and unit test (#1070)
d29ff753 is described below
commit d29ff753dd722cdfcfc9f6fbfaf9b85f86f4d029
Author: 无言独上机房 <[email protected]>
AuthorDate: Tue Apr 4 15:19:32 2023 +0800
Prometheus http_sd discovery and unit test (#1070)
---
pkg/admin/model/monitor.go | 5 +
pkg/admin/services/monitor_service.go | 7 +-
pkg/admin/services/prometheus_service_impl.go | 39 ++++-
pkg/admin/services/prometheus_service_impl_test.go | 171 +++++++++++++++++++++
.../{model/monitor.go => util/monitor_utils.go} | 13 +-
.../monitor.go => util/monitor_utils_test.go} | 42 ++++-
6 files changed, 266 insertions(+), 11 deletions(-)
diff --git a/pkg/admin/model/monitor.go b/pkg/admin/model/monitor.go
index 3ff6b40e..d4767841 100644
--- a/pkg/admin/model/monitor.go
+++ b/pkg/admin/model/monitor.go
@@ -19,3 +19,8 @@ type Response struct {
Status int `json:"status"`
Data string `json:"data"`
}
+
+type Target struct {
+ Targets []string `json:"targets"`
+ Labels map[string]string `json:"labels"`
+}
diff --git a/pkg/admin/services/monitor_service.go
b/pkg/admin/services/monitor_service.go
index 5c377d41..fdec3698 100644
--- a/pkg/admin/services/monitor_service.go
+++ b/pkg/admin/services/monitor_service.go
@@ -15,9 +15,14 @@
package services
-import "github.com/apache/dubbo-admin/pkg/admin/model"
+import (
+ "net/http"
+
+ "github.com/apache/dubbo-admin/pkg/admin/model"
+)
type MonitorService interface {
FlowMetrics() ([]model.Response, error) // Traffic overview
ClusterMetrics() ([]model.Response, error) // Cluster overview
+ PromDiscovery(w http.ResponseWriter) error // prometheus http_sd
discovery
}
diff --git a/pkg/admin/services/prometheus_service_impl.go
b/pkg/admin/services/prometheus_service_impl.go
index 0e570c6a..4b034945 100644
--- a/pkg/admin/services/prometheus_service_impl.go
+++ b/pkg/admin/services/prometheus_service_impl.go
@@ -17,6 +17,7 @@ package services
import (
"context"
+ "encoding/json"
"fmt"
"net/http"
"strconv"
@@ -28,17 +29,51 @@ import (
"github.com/apache/dubbo-admin/pkg/admin/config"
"github.com/apache/dubbo-admin/pkg/admin/constant"
"github.com/apache/dubbo-admin/pkg/admin/model"
+ "github.com/apache/dubbo-admin/pkg/admin/util"
"github.com/apache/dubbo-admin/pkg/logger"
"github.com/apache/dubbo-admin/pkg/monitor/prometheus"
)
var (
- providerService ProviderService = &ProviderServiceImpl{}
- consumerService ConsumerService = &ConsumerServiceImpl{}
+ providerService ProviderService = &ProviderServiceImpl{}
+ consumerService ConsumerService = &ConsumerServiceImpl{}
+ providerServiceImpl = &ProviderServiceImpl{}
)
type PrometheusServiceImpl struct{}
+func (p *PrometheusServiceImpl) PromDiscovery(w http.ResponseWriter) error {
+ w.Header().Set("Content-Type", "application/json")
+ // Reduce the call chain and improve performance.
+ proAddr, err := providerServiceImpl.findAddresses()
+ if err != nil {
+ logger.Sugar().Errorf("Error provider findAddresses: %v\n", err)
+ return err
+ }
+ var targets []string
+ for i := 0; i < len(proAddr); i++ {
+ targets = append(targets, util.GetDiscoveryPath(proAddr[i]))
+ }
+ filterCon := make(map[string]string)
+ filterCon[constant.CategoryKey] = constant.ConsumersCategory
+ servicesMap, err := util.FilterFromCategory(filterCon)
+ if err != nil {
+ logger.Sugar().Errorf("Error filter category: %v\n", err)
+ return err
+ }
+ for _, url := range servicesMap {
+ targets = append(targets, util.GetDiscoveryPath(url.Location))
+ }
+ target := []model.Target{
+ {
+ Targets: targets,
+ Labels: map[string]string{},
+ },
+ }
+ err = json.NewEncoder(w).Encode(target)
+ return err
+}
+
func (p *PrometheusServiceImpl) ClusterMetrics() ([]model.Response, error) {
res := make([]model.Response, 5)
applications, err := providerService.FindApplications()
diff --git a/pkg/admin/services/prometheus_service_impl_test.go
b/pkg/admin/services/prometheus_service_impl_test.go
new file mode 100644
index 00000000..46f5ae91
--- /dev/null
+++ b/pkg/admin/services/prometheus_service_impl_test.go
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+
+// http://www.apache.org/licenses/LICENSE-2.0
+
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package services
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
+ "reflect"
+ "sync"
+ "testing"
+
+ "github.com/apache/dubbo-admin/pkg/admin/cache"
+ "github.com/apache/dubbo-admin/pkg/admin/constant"
+ "github.com/apache/dubbo-admin/pkg/admin/model"
+
+ "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+var prometheusService MonitorService = &PrometheusServiceImpl{}
+
+type args struct {
+ address []string
+}
+
+type test struct {
+ name string
+ args args
+ want []model.Target
+ wantErr error
+}
+
+func initCache(test []test) {
+ proService := &sync.Map{}
+ conService := &sync.Map{}
+ // protest1
+ protest1QueryParams := url.Values{
+ constant.ApplicationKey: {"protest1QueryParams"},
+ }
+ protest1, _ := common.NewURL(test[0].args.address[0],
+ common.WithProtocol(constant.AdminProtocol),
+ common.WithParams(protest1QueryParams),
+ common.WithLocation(test[0].args.address[0]),
+ )
+ // protest2
+ protest2QueryParams := url.Values{
+ constant.ApplicationKey: {"protest2QueryParams"},
+ }
+ protest2, _ := common.NewURL(test[0].args.address[1],
+ common.WithProtocol(constant.AdminProtocol),
+ common.WithParams(protest2QueryParams),
+ common.WithLocation(test[0].args.address[1]),
+ )
+
+ contest1QueryParams := url.Values{
+ constant.ApplicationKey: {"protest1QueryParams"},
+ }
+ // consumer test1
+ contest1, _ := common.NewURL(test[0].args.address[2],
+ common.WithProtocol(constant.AdminProtocol),
+ common.WithParams(contest1QueryParams),
+ common.WithLocation(test[0].args.address[2]),
+ )
+ // consumer test2
+ contest2QueryParams := url.Values{
+ constant.ApplicationKey: {"protest2QueryParams"},
+ }
+ contest2, _ := common.NewURL(test[0].args.address[3],
+ common.WithProtocol(constant.AdminProtocol),
+ common.WithParams(contest2QueryParams),
+ common.WithLocation(test[0].args.address[3]),
+ )
+ proService.Store("providers", map[string]*common.URL{
+ "protest1": protest1,
+ "protest2": protest2,
+ })
+
+ conService.Store("consumers", map[string]*common.URL{
+ "contest1": contest1,
+ "contest2": contest2,
+ })
+
+ cache.InterfaceRegistryCache.Store(constant.ProvidersCategory,
proService)
+ cache.InterfaceRegistryCache.Store(constant.ConsumersCategory,
conService)
+}
+
+// Simulate Prometheus to send requests for http_sd service discovery.
+func initPromClient(url string) ([]byte, error) {
+ resp, err := http.Get(url)
+ if err != nil {
+ return nil, err
+ }
+ defer resp.Body.Close()
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return nil, err
+ }
+ return body, nil
+}
+
+// Simulate Prometheus to periodically send requests to admin to realize
http_ds service discovery.
+func TestPrometheusServiceImpl_PromDiscovery(t *testing.T) {
+ tests := []test{
+ {
+ name: "TEST",
+ args: args{
+ address: []string{
+ "127.0.0.1:0",
+ "198.127.163.150:8080",
+ "198.127.163.153:0",
+ "198.127.163.151:0",
+ },
+ },
+ wantErr: nil,
+ want: []model.Target{
+ {
+ Labels: map[string]string{},
+ Targets: []string{
+ "127.0.0.1:22222",
+ "198.127.163.150:22222",
+ "198.127.163.153:22222",
+ "198.127.163.151:22222",
+ },
+ },
+ },
+ },
+ }
+ initCache(tests)
+ defer cache.InterfaceRegistryCache.Delete(constant.ProvidersCategory)
+ defer cache.InterfaceRegistryCache.Delete(constant.ConsumersCategory)
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ts := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ err := prometheusService.PromDiscovery(w)
+ if err != nil {
+ t.Errorf("Server Start Error: %v\n",
err)
+ }
+ }))
+ defer ts.Close()
+ addr := ts.URL
+ resp, err := initPromClient(addr)
+ fmt.Println(string(resp))
+ if err != nil {
+ t.Errorf("Error: %v\n", err)
+ }
+ var target []model.Target
+ _ = json.Unmarshal(resp, &target)
+ if !reflect.DeepEqual(target, tt.want) {
+ t.Errorf("PromDiscovery() got = %v, want %v",
target, tt.want)
+ }
+ })
+ }
+}
diff --git a/pkg/admin/model/monitor.go b/pkg/admin/util/monitor_utils.go
similarity index 78%
copy from pkg/admin/model/monitor.go
copy to pkg/admin/util/monitor_utils.go
index 3ff6b40e..f7a328ab 100644
--- a/pkg/admin/model/monitor.go
+++ b/pkg/admin/util/monitor_utils.go
@@ -13,9 +13,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package model
+package util
-type Response struct {
- Status int `json:"status"`
- Data string `json:"data"`
+import "strings"
+
+func GetDiscoveryPath(address string) string {
+ if strings.Contains(address, ":") {
+ index := strings.Index(address, ":")
+ return address[0:index] + ":22222"
+ }
+ return address + ":22222"
}
diff --git a/pkg/admin/model/monitor.go b/pkg/admin/util/monitor_utils_test.go
similarity index 54%
copy from pkg/admin/model/monitor.go
copy to pkg/admin/util/monitor_utils_test.go
index 3ff6b40e..56044740 100644
--- a/pkg/admin/model/monitor.go
+++ b/pkg/admin/util/monitor_utils_test.go
@@ -13,9 +13,43 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package model
+package util
-type Response struct {
- Status int `json:"status"`
- Data string `json:"data"`
+import (
+ "reflect"
+ "testing"
+)
+
+func TestGetDiscoveryPath(t *testing.T) {
+ type args struct {
+ address string
+ }
+ tests := []struct {
+ name string
+ args args
+ want string
+ }{
+ {
+ name: "RightTest1",
+ args: args{
+ address: "127.0.0.1:0",
+ },
+ want: "127.0.0.1:22222",
+ },
+ {
+ name: "RightTest2",
+ args: args{
+ address: "192.168.127.153",
+ },
+ want: "192.168.127.153:22222",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ path := GetDiscoveryPath(tt.args.address)
+ if !reflect.DeepEqual(path, tt.want) {
+ t.Errorf("GetDiscoveryPath() = %v, want %v",
path, tt.want)
+ }
+ })
+ }
}