This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/develop by this push:
     new 30154d7c feat: supports tcp/http/https for health check & fix: Fix 
ignoring the domains field #667 (#668)
30154d7c is described below

commit 30154d7cc17d4c9753750ecabf7e24c25dc1be20
Author: Alan <[email protected]>
AuthorDate: Sat Apr 26 13:25:31 2025 +0800

    feat: supports tcp/http/https for health check & fix: Fix ignoring the 
domains field #667 (#668)
    
    * endpoints support domain config field
    add health check for http and https
    health checkout support for domain
    
    * add test for tcp healthcheck
    
    * fix licence
    
    * update connection check logic
    
    * bug fix
    
    * fix import
    
    * fix ci
    
    * copilot suggest
    
    * rename symbol
    
    * rename symbol
    
    * tidy CheckTcpConn logic & add test case
    
    * delete none safe ip address
    
    ---------
    
    Co-authored-by: Xuetao Li <[email protected]>
---
 pkg/cluster/healthcheck/healthcheck.go       |  55 ++++++---
 pkg/cluster/healthcheck/healthcheck_test.go  | 178 +++++++++++++++++++++++++++
 pkg/cluster/healthcheck/{tcp.go => http.go}  |  23 ++--
 pkg/cluster/healthcheck/{tcp.go => https.go} |  24 ++--
 pkg/cluster/healthcheck/tcp.go               |  16 +--
 pkg/cluster/healthcheck/utils.go             |  76 ++++++++++++
 pkg/model/base.go                            |   3 +
 pkg/model/health.go                          |  18 +--
 8 files changed, 324 insertions(+), 69 deletions(-)

diff --git a/pkg/cluster/healthcheck/healthcheck.go 
b/pkg/cluster/healthcheck/healthcheck.go
index 6d87e7ae..4f6a7cb1 100644
--- a/pkg/cluster/healthcheck/healthcheck.go
+++ b/pkg/cluster/healthcheck/healthcheck.go
@@ -19,18 +19,17 @@ package healthcheck
 
 import (
        "runtime/debug"
+       "strings"
        "sync"
        "sync/atomic"
        "time"
 )
 
-import (
-       gxtime "github.com/dubbogo/gost/time"
-)
-
 import (
        "github.com/apache/dubbo-go-pixiu/pkg/logger"
        "github.com/apache/dubbo-go-pixiu/pkg/model"
+
+       gxtime "github.com/dubbogo/gost/time"
 )
 
 const (
@@ -52,14 +51,15 @@ type HealthChecker struct {
        initialDelay       time.Duration
        cluster            *model.ClusterConfig
        unhealthyThreshold uint32
+       protocol           string
 }
 
 // EndpointChecker is a wrapper of types.HealthCheckSession for health check
 type EndpointChecker struct {
        endpoint      *model.Endpoint
        HealthChecker *HealthChecker
-       // TCP checker, can extend to http, grpc, dubbo or other protocol 
checker
-       tcpChecker    *TCPChecker
+       // checker, todo can extend to TCP, http, grpc, dubbo or other protocol 
checker
+       checker       Checker
        resp          chan checkResponse
        timeout       chan bool
        checkID       uint64
@@ -73,6 +73,11 @@ type EndpointChecker struct {
        once sync.Once
 }
 
+type Checker interface {
+       CheckHealth() bool
+       OnTimeout()
+}
+
 type checkResponse struct {
        ID      uint64
        Healthy bool
@@ -108,6 +113,7 @@ func CreateHealthCheck(cluster *model.ClusterConfig, cfg 
model.HealthCheckConfig
        }
 
        hc := &HealthChecker{
+               protocol:           cfg.Protocol,
                sessionConfig:      cfg.SessionConfig,
                cluster:            cluster,
                timeout:            timeout,
@@ -162,8 +168,34 @@ func (hc *HealthChecker) stopCheck(endpoint 
*model.Endpoint) {
 }
 
 func newChecker(endpoint *model.Endpoint, hc *HealthChecker) *EndpointChecker {
+       var checker Checker
+       protocol := strings.ToLower(hc.protocol)
+       switch protocol {
+       case "tcp":
+               checker = &TCPChecker{
+                       address: endpoint.Address.GetAddress(),
+                       timeout: hc.timeout,
+               }
+       case "http":
+               checker = &HTTPChecker{
+                       address: endpoint.Address.GetAddress(),
+                       timeout: hc.timeout,
+               }
+       case "https":
+               checker = &HTTPSChecker{
+                       address: endpoint.Address.GetAddress(),
+                       timeout: hc.timeout,
+               }
+       default:
+               logger.Warnf("[health check] %s health checker is not 
implemented, using tcp checker", hc.protocol)
+               checker = &TCPChecker{
+                       address: endpoint.Address.GetAddress(),
+                       timeout: hc.timeout,
+               }
+       }
+
        c := &EndpointChecker{
-               tcpChecker:    newTcpChecker(endpoint, hc.timeout),
+               checker:       checker,
                endpoint:      endpoint,
                HealthChecker: hc,
                resp:          make(chan checkResponse),
@@ -173,13 +205,6 @@ func newChecker(endpoint *model.Endpoint, hc 
*HealthChecker) *EndpointChecker {
        return c
 }
 
-func newTcpChecker(endpoint *model.Endpoint, timeout time.Duration) 
*TCPChecker {
-       return &TCPChecker{
-               addr:    endpoint.Address.GetAddress(),
-               timeout: timeout,
-       }
-}
-
 func (hc *HealthChecker) getCheckInterval() time.Duration {
        return hc.intervalBase
 }
@@ -279,7 +304,7 @@ func (c *EndpointChecker) OnCheck() {
        c.checkTimeout = gxtime.AfterFunc(c.HealthChecker.timeout, c.OnTimeout)
        c.resp <- checkResponse{
                ID:      id,
-               Healthy: c.tcpChecker.CheckHealth(),
+               Healthy: c.checker.CheckHealth(),
        }
 }
 
diff --git a/pkg/cluster/healthcheck/healthcheck_test.go 
b/pkg/cluster/healthcheck/healthcheck_test.go
new file mode 100644
index 00000000..c52bf23c
--- /dev/null
+++ b/pkg/cluster/healthcheck/healthcheck_test.go
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package healthcheck
+
+import (
+       "net"
+       "testing"
+       "time"
+)
+
+func TestNormalizeAddress(t *testing.T) {
+       tests := []struct {
+               name        string
+               address     string
+               port        string
+               wantAddress string
+               wantErr     bool
+       }{
+               {
+                       name:        "port is empty, address has port",
+                       address:     "localhost:8080",
+                       port:        "",
+                       wantAddress: "localhost:8080",
+                       wantErr:     false,
+               },
+               {
+                       name:        "port is empty, address has no port",
+                       address:     "localhost",
+                       port:        "",
+                       wantAddress: "",
+                       wantErr:     true,
+               },
+               {
+                       name:        "port is not empty, address has no port",
+                       address:     "localhost",
+                       port:        "80",
+                       wantAddress: "localhost:80",
+                       wantErr:     false,
+               },
+               {
+                       name:        "port is not empty, address has same port",
+                       address:     "localhost:80",
+                       port:        "80",
+                       wantAddress: "localhost:80",
+                       wantErr:     false,
+               },
+               {
+                       name:        "port is not empty, address has different 
port",
+                       address:     "localhost:8080",
+                       port:        "80",
+                       wantAddress: "localhost:80",
+                       wantErr:     false,
+               },
+               {
+                       name:        "invalid address format for empty port",
+                       address:     "[::1]", // IPv6 without port
+                       port:        "",
+                       wantAddress: "",
+                       wantErr:     true,
+               },
+               {
+                       name:        "valid IPv6 address with port",
+                       address:     "[::1]:8080",
+                       port:        "",
+                       wantAddress: "[::1]:8080",
+                       wantErr:     false,
+               },
+               {
+                       name:        "port is not empty, valid IPv6 address 
without port",
+                       address:     "[::1]",
+                       port:        "80",
+                       wantAddress: "[::1]:80",
+                       wantErr:     false,
+               },
+               {
+                       name:        "port is not empty, valid IPv6 address 
with different port",
+                       address:     "[::1]:8080",
+                       port:        "80",
+                       wantAddress: "[::1]:80",
+                       wantErr:     false,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       gotAddress, gotErr := normalizeAddress(tt.address, 
tt.port)
+                       if (gotErr != nil) != tt.wantErr {
+                               t.Errorf("normalizeAddress(%q, %q) error = %v, 
wantErr %v", tt.address, tt.port, gotErr, tt.wantErr)
+                               return
+                       }
+                       if gotAddress != tt.wantAddress {
+                               t.Errorf("normalizeAddress(%q, %q) gotAddress = 
%q, want %q", tt.address, tt.port, gotAddress, tt.wantAddress)
+                       }
+               })
+       }
+}
+
+func TestCheckTcpConn(t *testing.T) {
+       // We need a way to simulate a successful and a failed TCP connection.
+       // We can achieve this by setting up a temporary listener for the 
success case
+       // and using an invalid address for the failure case.
+
+       // Success case: Set up a temporary listener
+       listener, err := net.Listen("tcp", "localhost:0") // Listen on a random 
available port
+       if err != nil {
+               t.Fatalf("Failed to create listener: %v", err)
+       }
+       defer listener.Close()
+       addr := listener.Addr().String()
+       host, portStr, _ := net.SplitHostPort(addr)
+
+       t.Run("successful connection", func(t *testing.T) {
+               go func() {
+                       conn, _ := listener.Accept() // Accept the incoming 
connection
+                       if conn != nil {
+                               conn.Close()
+                       }
+               }()
+               success := CheckTcpConn(host, portStr, 100*time.Millisecond)
+               if !success {
+                       t.Errorf("CheckTcpConn(%q, %q, ...) should return true 
for a successful connection", host, portStr)
+               }
+       })
+
+       // Failure case 1: Invalid address format
+       t.Run("failed connection due to invalid address format", func(t 
*testing.T) {
+               success := CheckTcpConn("invalid address", "80", 
100*time.Millisecond)
+               if success {
+                       t.Errorf("CheckTcpConn(%q, %q, ...) should return false 
for an invalid address format", "invalid address", "80")
+               }
+       })
+
+       // Failure case 2: Connection timeout
+       t.Run("failed connection due to timeout", func(t *testing.T) {
+               // Use a non-routable local address to ensure a timeout
+               success := CheckTcpConn("127.0.0.1", "80", 100*time.Millisecond)
+               if success {
+                       t.Errorf("CheckTcpConn(%q, %q, ...) should return false 
due to timeout", "127.0.0.1", "80")
+               }
+       })
+
+       // Test with empty port (should fail due to normalizeAddress)
+       t.Run("failed with empty port and no port in address", func(t 
*testing.T) {
+               success := CheckTcpConn("localhost", "", 100*time.Millisecond)
+               if success {
+                       t.Errorf("CheckTcpConn(%q, %q, ...) should return false 
when port is empty and address has no port", "localhost", "")
+               }
+       })
+
+       // Test with empty port and address has port (should succeed)
+       t.Run("successful with empty port and port in address", func(t 
*testing.T) {
+               go func() {
+                       conn, _ := listener.Accept()
+                       if conn != nil {
+                               conn.Close()
+                       }
+               }()
+               success := CheckTcpConn(addr, "", 100*time.Millisecond)
+               if !success {
+                       t.Errorf("CheckTcpConn(%q, %q, ...) should return true 
when port is empty and address has port", addr, "")
+               }
+       })
+}
diff --git a/pkg/cluster/healthcheck/tcp.go b/pkg/cluster/healthcheck/http.go
similarity index 69%
copy from pkg/cluster/healthcheck/tcp.go
copy to pkg/cluster/healthcheck/http.go
index 7d12670a..cf8feb70 100644
--- a/pkg/cluster/healthcheck/tcp.go
+++ b/pkg/cluster/healthcheck/http.go
@@ -14,30 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package healthcheck
 
 import (
-       "net"
        "time"
 )
 
-import (
-       "github.com/apache/dubbo-go-pixiu/pkg/logger"
-)
-
-type TCPChecker struct {
-       addr    string
+type HTTPChecker struct {
+       address string
        timeout time.Duration
 }
 
-func (s *TCPChecker) CheckHealth() bool {
-       conn, err := net.DialTimeout("tcp", s.addr, s.timeout)
-       if err != nil {
-               logger.Infof("[health check] tcp checker for host %s error: 
%v", s.addr, err)
-               return false
-       }
-       conn.Close()
-       return true
+func (s *HTTPChecker) CheckHealth() bool {
+       tarAddr := s.address
+       return CheckTcpConn(tarAddr, "80", s.timeout)
 }
 
-func (s *TCPChecker) OnTimeout() {}
+func (s *HTTPChecker) OnTimeout() {}
diff --git a/pkg/cluster/healthcheck/tcp.go b/pkg/cluster/healthcheck/https.go
similarity index 69%
copy from pkg/cluster/healthcheck/tcp.go
copy to pkg/cluster/healthcheck/https.go
index 7d12670a..b07544be 100644
--- a/pkg/cluster/healthcheck/tcp.go
+++ b/pkg/cluster/healthcheck/https.go
@@ -14,30 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package healthcheck
 
 import (
-       "net"
        "time"
 )
 
-import (
-       "github.com/apache/dubbo-go-pixiu/pkg/logger"
-)
-
-type TCPChecker struct {
-       addr    string
+type HTTPSChecker struct {
+       address string
        timeout time.Duration
 }
 
-func (s *TCPChecker) CheckHealth() bool {
-       conn, err := net.DialTimeout("tcp", s.addr, s.timeout)
-       if err != nil {
-               logger.Infof("[health check] tcp checker for host %s error: 
%v", s.addr, err)
-               return false
-       }
-       conn.Close()
-       return true
+func (s *HTTPSChecker) CheckHealth() bool {
+       tarAddr := s.address
+       return CheckTcpConn(tarAddr, "443", s.timeout)
+
 }
 
-func (s *TCPChecker) OnTimeout() {}
+func (s *HTTPSChecker) OnTimeout() {}
diff --git a/pkg/cluster/healthcheck/tcp.go b/pkg/cluster/healthcheck/tcp.go
index 7d12670a..7043dca3 100644
--- a/pkg/cluster/healthcheck/tcp.go
+++ b/pkg/cluster/healthcheck/tcp.go
@@ -14,30 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package healthcheck
 
 import (
-       "net"
        "time"
 )
 
-import (
-       "github.com/apache/dubbo-go-pixiu/pkg/logger"
-)
-
 type TCPChecker struct {
-       addr    string
+       address string
        timeout time.Duration
 }
 
 func (s *TCPChecker) CheckHealth() bool {
-       conn, err := net.DialTimeout("tcp", s.addr, s.timeout)
-       if err != nil {
-               logger.Infof("[health check] tcp checker for host %s error: 
%v", s.addr, err)
-               return false
-       }
-       conn.Close()
-       return true
+       return CheckTcpConn(s.address, "", s.timeout)
 }
 
 func (s *TCPChecker) OnTimeout() {}
diff --git a/pkg/cluster/healthcheck/utils.go b/pkg/cluster/healthcheck/utils.go
new file mode 100644
index 00000000..d079f3da
--- /dev/null
+++ b/pkg/cluster/healthcheck/utils.go
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package healthcheck
+
+import (
+       "net"
+       "strings"
+       "time"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+func CheckTcpConn(address string, port string, timeout time.Duration) bool {
+       normalizedAddress, err := normalizeAddress(address, port)
+       if err != nil {
+               logger.Infof("[health check] address format for address \"%s\" 
failed, %s", address, err.Error())
+               return false
+       }
+
+       conn, err := net.DialTimeout("tcp", normalizedAddress, timeout)
+       if err != nil {
+               logger.Infof("[health check] health check for address \"%s\" 
failed, %s", normalizedAddress, err.Error())
+               return false
+       }
+       defer conn.Close()
+       return true
+}
+
+// normalizeAddress normalizes the address by ensuring it has the correct port.
+// If the port field is empty, it will check if the address already has a port.
+//   - If the address has a port, it will return the address as is.
+//   - If the address does not have a port, it will return err.
+//
+// If the port field is not empty, it will check if the address's port matches 
the provided port.
+//   - If it matches, it will return the address as is.
+//   - If it does not match, it will return the address with the new port.
+func normalizeAddress(address string, port string) (string, error) {
+       if port == "" {
+               _, _, err := net.SplitHostPort(address)
+               if err != nil {
+                       return "", err
+               }
+               return address, nil
+       }
+
+       host, existingPort, err := net.SplitHostPort(address)
+       if err != nil {
+               if strings.Contains(err.Error(), "missing port in address") {
+                       return net.JoinHostPort(strings.Trim(address, "[]"), 
port), nil
+               }
+               return "", err
+       }
+
+       if existingPort != port {
+               return net.JoinHostPort(host, port), nil
+       }
+
+       return address, nil
+}
diff --git a/pkg/model/base.go b/pkg/model/base.go
index 79447f31..2612fb37 100644
--- a/pkg/model/base.go
+++ b/pkg/model/base.go
@@ -143,5 +143,8 @@ type (
 )
 
 func (a SocketAddress) GetAddress() string {
+       if len(a.Domains) > 0 {
+               return a.Domains[0]
+       }
        return fmt.Sprintf("%s:%v", a.Address, a.Port)
 }
diff --git a/pkg/model/health.go b/pkg/model/health.go
index df0eb396..7f260b94 100644
--- a/pkg/model/health.go
+++ b/pkg/model/health.go
@@ -19,15 +19,15 @@ package model
 
 // HealthCheck
 type HealthCheckConfig struct {
-       Protocol            string                 `json:"protocol,omitempty"`
-       TimeoutConfig       string                 `json:"timeout,omitempty"`
-       IntervalConfig      string                 `json:"interval,omitempty"`
-       InitialDelaySeconds string                 
`json:"initial_delay_seconds,omitempty"`
-       HealthyThreshold    uint32                 
`json:"healthy_threshold,omitempty"`
-       UnhealthyThreshold  uint32                 
`json:"unhealthy_threshold,omitempty"`
-       ServiceName         string                 
`json:"service_name,omitempty"`
-       SessionConfig       map[string]interface{} 
`json:"check_config,omitempty"`
-       CommonCallbacks     []string               
`json:"common_callbacks,omitempty"`
+       Protocol            string                 `yaml:"protocol" 
json:"protocol,omitempty" mapstructure:"protocol"`
+       TimeoutConfig       string                 `yaml:"timeout" 
json:"timeout,omitempty" mapstructure:"timeout"`
+       IntervalConfig      string                 `yaml:"interval" 
json:"interval,omitempty" mapstructure:"interval"`
+       InitialDelaySeconds string                 
`yaml:"initial_delay_seconds" json:"initial_delay_seconds,omitempty" 
mapstructure:"initial_delay_seconds"`
+       HealthyThreshold    uint32                 `yaml:"healthy_threshold" 
json:"healthy_threshold,omitempty" mapstructure:"healthy_threshold"`
+       UnhealthyThreshold  uint32                 `yaml:"unhealthy_threshold" 
json:"unhealthy_threshold,omitempty" mapstructure:"unhealthy_threshold"`
+       ServiceName         string                 `yaml:"service_name" 
json:"service_name,omitempty" mapstructure:"service_name"`
+       SessionConfig       map[string]interface{} `yaml:"check_config" 
json:"check_config,omitempty" mapstructure:"check_config"`
+       CommonCallbacks     []string               `yaml:"common_callbacks" 
json:"common_callbacks,omitempty" mapstructure:"common_callbacks"`
 }
 
 // HttpHealthCheck

Reply via email to