This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/develop by this push:
new 30154d7c feat: supports tcp/http/https for health check & fix: Fix
ignoring the domains field #667 (#668)
30154d7c is described below
commit 30154d7cc17d4c9753750ecabf7e24c25dc1be20
Author: Alan <[email protected]>
AuthorDate: Sat Apr 26 13:25:31 2025 +0800
feat: supports tcp/http/https for health check & fix: Fix ignoring the
domains field #667 (#668)
* endpoints support domain config field
add health check for http and https
health checkout support for domain
* add test for tcp healthcheck
* fix licence
* update connection check logic
* bug fix
* fix import
* fix ci
* copilot suggest
* rename symbol
* rename symbol
* tidy CheckTcpConn logic & add test case
* delete none safe ip address
---------
Co-authored-by: Xuetao Li <[email protected]>
---
pkg/cluster/healthcheck/healthcheck.go | 55 ++++++---
pkg/cluster/healthcheck/healthcheck_test.go | 178 +++++++++++++++++++++++++++
pkg/cluster/healthcheck/{tcp.go => http.go} | 23 ++--
pkg/cluster/healthcheck/{tcp.go => https.go} | 24 ++--
pkg/cluster/healthcheck/tcp.go | 16 +--
pkg/cluster/healthcheck/utils.go | 76 ++++++++++++
pkg/model/base.go | 3 +
pkg/model/health.go | 18 +--
8 files changed, 324 insertions(+), 69 deletions(-)
diff --git a/pkg/cluster/healthcheck/healthcheck.go
b/pkg/cluster/healthcheck/healthcheck.go
index 6d87e7ae..4f6a7cb1 100644
--- a/pkg/cluster/healthcheck/healthcheck.go
+++ b/pkg/cluster/healthcheck/healthcheck.go
@@ -19,18 +19,17 @@ package healthcheck
import (
"runtime/debug"
+ "strings"
"sync"
"sync/atomic"
"time"
)
-import (
- gxtime "github.com/dubbogo/gost/time"
-)
-
import (
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pkg/model"
+
+ gxtime "github.com/dubbogo/gost/time"
)
const (
@@ -52,14 +51,15 @@ type HealthChecker struct {
initialDelay time.Duration
cluster *model.ClusterConfig
unhealthyThreshold uint32
+ protocol string
}
// EndpointChecker is a wrapper of types.HealthCheckSession for health check
type EndpointChecker struct {
endpoint *model.Endpoint
HealthChecker *HealthChecker
- // TCP checker, can extend to http, grpc, dubbo or other protocol
checker
- tcpChecker *TCPChecker
+ // checker, todo can extend to TCP, http, grpc, dubbo or other protocol
checker
+ checker Checker
resp chan checkResponse
timeout chan bool
checkID uint64
@@ -73,6 +73,11 @@ type EndpointChecker struct {
once sync.Once
}
+type Checker interface {
+ CheckHealth() bool
+ OnTimeout()
+}
+
type checkResponse struct {
ID uint64
Healthy bool
@@ -108,6 +113,7 @@ func CreateHealthCheck(cluster *model.ClusterConfig, cfg
model.HealthCheckConfig
}
hc := &HealthChecker{
+ protocol: cfg.Protocol,
sessionConfig: cfg.SessionConfig,
cluster: cluster,
timeout: timeout,
@@ -162,8 +168,34 @@ func (hc *HealthChecker) stopCheck(endpoint
*model.Endpoint) {
}
func newChecker(endpoint *model.Endpoint, hc *HealthChecker) *EndpointChecker {
+ var checker Checker
+ protocol := strings.ToLower(hc.protocol)
+ switch protocol {
+ case "tcp":
+ checker = &TCPChecker{
+ address: endpoint.Address.GetAddress(),
+ timeout: hc.timeout,
+ }
+ case "http":
+ checker = &HTTPChecker{
+ address: endpoint.Address.GetAddress(),
+ timeout: hc.timeout,
+ }
+ case "https":
+ checker = &HTTPSChecker{
+ address: endpoint.Address.GetAddress(),
+ timeout: hc.timeout,
+ }
+ default:
+ logger.Warnf("[health check] %s health checker is not
implemented, using tcp checker", hc.protocol)
+ checker = &TCPChecker{
+ address: endpoint.Address.GetAddress(),
+ timeout: hc.timeout,
+ }
+ }
+
c := &EndpointChecker{
- tcpChecker: newTcpChecker(endpoint, hc.timeout),
+ checker: checker,
endpoint: endpoint,
HealthChecker: hc,
resp: make(chan checkResponse),
@@ -173,13 +205,6 @@ func newChecker(endpoint *model.Endpoint, hc
*HealthChecker) *EndpointChecker {
return c
}
-func newTcpChecker(endpoint *model.Endpoint, timeout time.Duration)
*TCPChecker {
- return &TCPChecker{
- addr: endpoint.Address.GetAddress(),
- timeout: timeout,
- }
-}
-
func (hc *HealthChecker) getCheckInterval() time.Duration {
return hc.intervalBase
}
@@ -279,7 +304,7 @@ func (c *EndpointChecker) OnCheck() {
c.checkTimeout = gxtime.AfterFunc(c.HealthChecker.timeout, c.OnTimeout)
c.resp <- checkResponse{
ID: id,
- Healthy: c.tcpChecker.CheckHealth(),
+ Healthy: c.checker.CheckHealth(),
}
}
diff --git a/pkg/cluster/healthcheck/healthcheck_test.go
b/pkg/cluster/healthcheck/healthcheck_test.go
new file mode 100644
index 00000000..c52bf23c
--- /dev/null
+++ b/pkg/cluster/healthcheck/healthcheck_test.go
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package healthcheck
+
+import (
+ "net"
+ "testing"
+ "time"
+)
+
+func TestNormalizeAddress(t *testing.T) {
+ tests := []struct {
+ name string
+ address string
+ port string
+ wantAddress string
+ wantErr bool
+ }{
+ {
+ name: "port is empty, address has port",
+ address: "localhost:8080",
+ port: "",
+ wantAddress: "localhost:8080",
+ wantErr: false,
+ },
+ {
+ name: "port is empty, address has no port",
+ address: "localhost",
+ port: "",
+ wantAddress: "",
+ wantErr: true,
+ },
+ {
+ name: "port is not empty, address has no port",
+ address: "localhost",
+ port: "80",
+ wantAddress: "localhost:80",
+ wantErr: false,
+ },
+ {
+ name: "port is not empty, address has same port",
+ address: "localhost:80",
+ port: "80",
+ wantAddress: "localhost:80",
+ wantErr: false,
+ },
+ {
+ name: "port is not empty, address has different
port",
+ address: "localhost:8080",
+ port: "80",
+ wantAddress: "localhost:80",
+ wantErr: false,
+ },
+ {
+ name: "invalid address format for empty port",
+ address: "[::1]", // IPv6 without port
+ port: "",
+ wantAddress: "",
+ wantErr: true,
+ },
+ {
+ name: "valid IPv6 address with port",
+ address: "[::1]:8080",
+ port: "",
+ wantAddress: "[::1]:8080",
+ wantErr: false,
+ },
+ {
+ name: "port is not empty, valid IPv6 address
without port",
+ address: "[::1]",
+ port: "80",
+ wantAddress: "[::1]:80",
+ wantErr: false,
+ },
+ {
+ name: "port is not empty, valid IPv6 address
with different port",
+ address: "[::1]:8080",
+ port: "80",
+ wantAddress: "[::1]:80",
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ gotAddress, gotErr := normalizeAddress(tt.address,
tt.port)
+ if (gotErr != nil) != tt.wantErr {
+ t.Errorf("normalizeAddress(%q, %q) error = %v,
wantErr %v", tt.address, tt.port, gotErr, tt.wantErr)
+ return
+ }
+ if gotAddress != tt.wantAddress {
+ t.Errorf("normalizeAddress(%q, %q) gotAddress =
%q, want %q", tt.address, tt.port, gotAddress, tt.wantAddress)
+ }
+ })
+ }
+}
+
+func TestCheckTcpConn(t *testing.T) {
+ // We need a way to simulate a successful and a failed TCP connection.
+ // We can achieve this by setting up a temporary listener for the
success case
+ // and using an invalid address for the failure case.
+
+ // Success case: Set up a temporary listener
+ listener, err := net.Listen("tcp", "localhost:0") // Listen on a random
available port
+ if err != nil {
+ t.Fatalf("Failed to create listener: %v", err)
+ }
+ defer listener.Close()
+ addr := listener.Addr().String()
+ host, portStr, _ := net.SplitHostPort(addr)
+
+ t.Run("successful connection", func(t *testing.T) {
+ go func() {
+ conn, _ := listener.Accept() // Accept the incoming
connection
+ if conn != nil {
+ conn.Close()
+ }
+ }()
+ success := CheckTcpConn(host, portStr, 100*time.Millisecond)
+ if !success {
+ t.Errorf("CheckTcpConn(%q, %q, ...) should return true
for a successful connection", host, portStr)
+ }
+ })
+
+ // Failure case 1: Invalid address format
+ t.Run("failed connection due to invalid address format", func(t
*testing.T) {
+ success := CheckTcpConn("invalid address", "80",
100*time.Millisecond)
+ if success {
+ t.Errorf("CheckTcpConn(%q, %q, ...) should return false
for an invalid address format", "invalid address", "80")
+ }
+ })
+
+ // Failure case 2: Connection timeout
+ t.Run("failed connection due to timeout", func(t *testing.T) {
+ // Use a non-routable local address to ensure a timeout
+ success := CheckTcpConn("127.0.0.1", "80", 100*time.Millisecond)
+ if success {
+ t.Errorf("CheckTcpConn(%q, %q, ...) should return false
due to timeout", "127.0.0.1", "80")
+ }
+ })
+
+ // Test with empty port (should fail due to normalizeAddress)
+ t.Run("failed with empty port and no port in address", func(t
*testing.T) {
+ success := CheckTcpConn("localhost", "", 100*time.Millisecond)
+ if success {
+ t.Errorf("CheckTcpConn(%q, %q, ...) should return false
when port is empty and address has no port", "localhost", "")
+ }
+ })
+
+ // Test with empty port and address has port (should succeed)
+ t.Run("successful with empty port and port in address", func(t
*testing.T) {
+ go func() {
+ conn, _ := listener.Accept()
+ if conn != nil {
+ conn.Close()
+ }
+ }()
+ success := CheckTcpConn(addr, "", 100*time.Millisecond)
+ if !success {
+ t.Errorf("CheckTcpConn(%q, %q, ...) should return true
when port is empty and address has port", addr, "")
+ }
+ })
+}
diff --git a/pkg/cluster/healthcheck/tcp.go b/pkg/cluster/healthcheck/http.go
similarity index 69%
copy from pkg/cluster/healthcheck/tcp.go
copy to pkg/cluster/healthcheck/http.go
index 7d12670a..cf8feb70 100644
--- a/pkg/cluster/healthcheck/tcp.go
+++ b/pkg/cluster/healthcheck/http.go
@@ -14,30 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package healthcheck
import (
- "net"
"time"
)
-import (
- "github.com/apache/dubbo-go-pixiu/pkg/logger"
-)
-
-type TCPChecker struct {
- addr string
+type HTTPChecker struct {
+ address string
timeout time.Duration
}
-func (s *TCPChecker) CheckHealth() bool {
- conn, err := net.DialTimeout("tcp", s.addr, s.timeout)
- if err != nil {
- logger.Infof("[health check] tcp checker for host %s error:
%v", s.addr, err)
- return false
- }
- conn.Close()
- return true
+func (s *HTTPChecker) CheckHealth() bool {
+ tarAddr := s.address
+ return CheckTcpConn(tarAddr, "80", s.timeout)
}
-func (s *TCPChecker) OnTimeout() {}
+func (s *HTTPChecker) OnTimeout() {}
diff --git a/pkg/cluster/healthcheck/tcp.go b/pkg/cluster/healthcheck/https.go
similarity index 69%
copy from pkg/cluster/healthcheck/tcp.go
copy to pkg/cluster/healthcheck/https.go
index 7d12670a..b07544be 100644
--- a/pkg/cluster/healthcheck/tcp.go
+++ b/pkg/cluster/healthcheck/https.go
@@ -14,30 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package healthcheck
import (
- "net"
"time"
)
-import (
- "github.com/apache/dubbo-go-pixiu/pkg/logger"
-)
-
-type TCPChecker struct {
- addr string
+type HTTPSChecker struct {
+ address string
timeout time.Duration
}
-func (s *TCPChecker) CheckHealth() bool {
- conn, err := net.DialTimeout("tcp", s.addr, s.timeout)
- if err != nil {
- logger.Infof("[health check] tcp checker for host %s error:
%v", s.addr, err)
- return false
- }
- conn.Close()
- return true
+func (s *HTTPSChecker) CheckHealth() bool {
+ tarAddr := s.address
+ return CheckTcpConn(tarAddr, "443", s.timeout)
+
}
-func (s *TCPChecker) OnTimeout() {}
+func (s *HTTPSChecker) OnTimeout() {}
diff --git a/pkg/cluster/healthcheck/tcp.go b/pkg/cluster/healthcheck/tcp.go
index 7d12670a..7043dca3 100644
--- a/pkg/cluster/healthcheck/tcp.go
+++ b/pkg/cluster/healthcheck/tcp.go
@@ -14,30 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package healthcheck
import (
- "net"
"time"
)
-import (
- "github.com/apache/dubbo-go-pixiu/pkg/logger"
-)
-
type TCPChecker struct {
- addr string
+ address string
timeout time.Duration
}
func (s *TCPChecker) CheckHealth() bool {
- conn, err := net.DialTimeout("tcp", s.addr, s.timeout)
- if err != nil {
- logger.Infof("[health check] tcp checker for host %s error:
%v", s.addr, err)
- return false
- }
- conn.Close()
- return true
+ return CheckTcpConn(s.address, "", s.timeout)
}
func (s *TCPChecker) OnTimeout() {}
diff --git a/pkg/cluster/healthcheck/utils.go b/pkg/cluster/healthcheck/utils.go
new file mode 100644
index 00000000..d079f3da
--- /dev/null
+++ b/pkg/cluster/healthcheck/utils.go
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package healthcheck
+
+import (
+ "net"
+ "strings"
+ "time"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+func CheckTcpConn(address string, port string, timeout time.Duration) bool {
+ normalizedAddress, err := normalizeAddress(address, port)
+ if err != nil {
+ logger.Infof("[health check] address format for address \"%s\"
failed, %s", address, err.Error())
+ return false
+ }
+
+ conn, err := net.DialTimeout("tcp", normalizedAddress, timeout)
+ if err != nil {
+ logger.Infof("[health check] health check for address \"%s\"
failed, %s", normalizedAddress, err.Error())
+ return false
+ }
+ defer conn.Close()
+ return true
+}
+
+// normalizeAddress normalizes the address by ensuring it has the correct port.
+// If the port field is empty, it will check if the address already has a port.
+// - If the address has a port, it will return the address as is.
+// - If the address does not have a port, it will return err.
+//
+// If the port field is not empty, it will check if the address's port matches
the provided port.
+// - If it matches, it will return the address as is.
+// - If it does not match, it will return the address with the new port.
+func normalizeAddress(address string, port string) (string, error) {
+ if port == "" {
+ _, _, err := net.SplitHostPort(address)
+ if err != nil {
+ return "", err
+ }
+ return address, nil
+ }
+
+ host, existingPort, err := net.SplitHostPort(address)
+ if err != nil {
+ if strings.Contains(err.Error(), "missing port in address") {
+ return net.JoinHostPort(strings.Trim(address, "[]"),
port), nil
+ }
+ return "", err
+ }
+
+ if existingPort != port {
+ return net.JoinHostPort(host, port), nil
+ }
+
+ return address, nil
+}
diff --git a/pkg/model/base.go b/pkg/model/base.go
index 79447f31..2612fb37 100644
--- a/pkg/model/base.go
+++ b/pkg/model/base.go
@@ -143,5 +143,8 @@ type (
)
func (a SocketAddress) GetAddress() string {
+ if len(a.Domains) > 0 {
+ return a.Domains[0]
+ }
return fmt.Sprintf("%s:%v", a.Address, a.Port)
}
diff --git a/pkg/model/health.go b/pkg/model/health.go
index df0eb396..7f260b94 100644
--- a/pkg/model/health.go
+++ b/pkg/model/health.go
@@ -19,15 +19,15 @@ package model
// HealthCheck
type HealthCheckConfig struct {
- Protocol string `json:"protocol,omitempty"`
- TimeoutConfig string `json:"timeout,omitempty"`
- IntervalConfig string `json:"interval,omitempty"`
- InitialDelaySeconds string
`json:"initial_delay_seconds,omitempty"`
- HealthyThreshold uint32
`json:"healthy_threshold,omitempty"`
- UnhealthyThreshold uint32
`json:"unhealthy_threshold,omitempty"`
- ServiceName string
`json:"service_name,omitempty"`
- SessionConfig map[string]interface{}
`json:"check_config,omitempty"`
- CommonCallbacks []string
`json:"common_callbacks,omitempty"`
+ Protocol string `yaml:"protocol"
json:"protocol,omitempty" mapstructure:"protocol"`
+ TimeoutConfig string `yaml:"timeout"
json:"timeout,omitempty" mapstructure:"timeout"`
+ IntervalConfig string `yaml:"interval"
json:"interval,omitempty" mapstructure:"interval"`
+ InitialDelaySeconds string
`yaml:"initial_delay_seconds" json:"initial_delay_seconds,omitempty"
mapstructure:"initial_delay_seconds"`
+ HealthyThreshold uint32 `yaml:"healthy_threshold"
json:"healthy_threshold,omitempty" mapstructure:"healthy_threshold"`
+ UnhealthyThreshold uint32 `yaml:"unhealthy_threshold"
json:"unhealthy_threshold,omitempty" mapstructure:"unhealthy_threshold"`
+ ServiceName string `yaml:"service_name"
json:"service_name,omitempty" mapstructure:"service_name"`
+ SessionConfig map[string]interface{} `yaml:"check_config"
json:"check_config,omitempty" mapstructure:"check_config"`
+ CommonCallbacks []string `yaml:"common_callbacks"
json:"common_callbacks,omitempty" mapstructure:"common_callbacks"`
}
// HttpHealthCheck