This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9b93fccf feat: Support dynamic update of routing configuration (#658)
9b93fccf is described below

commit 9b93fccf1e28eca1483bf8e8f57129d29c8e0f4c
Author: 1kasa <[email protected]>
AuthorDate: Fri May 2 11:46:51 2025 +0800

    feat: Support dynamic update of routing configuration (#658)
    
    * Support dynamic update of routing configuration
    
    * Support dynamic update of routing configuration
    
    * fix: wrong commit
    
    * fix: wrong commit
    
    * fix: wrong commit
    
    * fix: comment
    
    * fix: comment
    
    * handle ci error
    
    * handle error
    
    * handle ci error
    
    * handle comment
    
    * handle import
    
    * feat testcase
---
 pkg/common/router/trie/trie_test.go |  36 +++++++++
 pkg/hotreload/hotreload.go          |   3 +-
 pkg/hotreload/logger.go             |  10 +--
 pkg/hotreload/route_reloader.go     | 145 ++++++++++++++++++++++++++++++++++++
 pkg/listener/http/http_listener.go  |   1 +
 pkg/server/listener_manager.go      |   3 +
 pkg/server/router_manager.go        |  59 +++++++++++++++
 7 files changed, 251 insertions(+), 6 deletions(-)

diff --git a/pkg/common/router/trie/trie_test.go 
b/pkg/common/router/trie/trie_test.go
index 3b3aa40f..0e6a7080 100644
--- a/pkg/common/router/trie/trie_test.go
+++ b/pkg/common/router/trie/trie_test.go
@@ -206,3 +206,39 @@ func TestTrie_ParamMatch(t *testing.T) {
        assert.Equal(t, "", node.GetBizInfo())
        assert.True(t, ret)
 }
+
+func TestTrieSoftDelete(t *testing.T) {
+       trie := NewTrie()
+
+       ret, _ := trie.Put("/a/b/c", "route1")
+       assert.True(t, ret)
+
+       ret, _ = trie.Put("/a/b/c/d", "route2")
+       assert.True(t, ret)
+
+       ret, _ = trie.Put("/a/b/e", "route3")
+       assert.True(t, ret)
+
+       node, _, ok := trie.Match("/a/b/c")
+       assert.True(t, ok)
+       assert.Equal(t, "route1", node.GetBizInfo())
+
+       _, err := trie.Remove("/a/b/c")
+       assert.NoError(t, err)
+
+       node, _, ok = trie.Match("/a/b/c")
+       assert.False(t, ok)
+       assert.Nil(t, node)
+
+       node, _, ok = trie.Match("/a/b/c/d")
+       assert.True(t, ok)
+       assert.Equal(t, "route2", node.GetBizInfo())
+
+       node, _, ok = trie.Match("/a/b/e")
+       assert.True(t, ok)
+       assert.Equal(t, "route3", node.GetBizInfo())
+
+       node, _, ok = trie.Match("/a/b/c/")
+       assert.False(t, ok)
+       assert.Nil(t, node)
+}
diff --git a/pkg/hotreload/hotreload.go b/pkg/hotreload/hotreload.go
index cd6acce5..133a234e 100644
--- a/pkg/hotreload/hotreload.go
+++ b/pkg/hotreload/hotreload.go
@@ -46,7 +46,7 @@ type Coordinator struct {
        manager   *config.ConfigManager // Configuration manager
 }
 
-var coordinator = Coordinator{reloaders: []HotReloader{&LoggerReloader{}}}
+var coordinator = Coordinator{reloaders: []HotReloader{&LoggerReloader{}, 
&RouteReloader{}}}
 
 // StartHotReload initializes the hot reload process.
 // It should be called when the project starts, e.g., in cmd/gateway.go.
@@ -58,6 +58,7 @@ func StartHotReload(manager *config.ConfigManager, boot 
*model.Bootstrap) {
 
        coordinator.manager = manager
        coordinator.boot = boot
+
        go coordinator.HotReload()
 }
 
diff --git a/pkg/hotreload/logger.go b/pkg/hotreload/logger.go
index 45430cf3..eb2a024f 100644
--- a/pkg/hotreload/logger.go
+++ b/pkg/hotreload/logger.go
@@ -44,25 +44,25 @@ func (r *LoggerReloader) CheckUpdate(oldConfig, newConfig 
*model.Bootstrap) bool
                oc.DisableCaller != nc.DisableCaller ||
                oc.DisableStacktrace != nc.DisableStacktrace ||
                oc.Encoding != nc.Encoding {
-               return false
+               return true
        }
 
        // Check sampling configuration.
        if !r.checkSampling(oc.Sampling, nc.Sampling) {
-               return false
+               return true
        }
 
        // Check encoder configuration.
        if !r.checkEncoderConfig(oc.EncoderConfig, nc.EncoderConfig) {
-               return false
+               return true
        }
 
        // Check output paths.
        if !equal(oc.OutputPaths, nc.OutputPaths) {
-               return false
+               return true
        }
 
-       return true
+       return false
 }
 
 // HotReload applies the new logger configuration.
diff --git a/pkg/hotreload/route_reloader.go b/pkg/hotreload/route_reloader.go
new file mode 100644
index 00000000..53a1ac30
--- /dev/null
+++ b/pkg/hotreload/route_reloader.go
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hotreload
+
+import (
+       "encoding/json"
+)
+
+import (
+       "github.com/pkg/errors"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
+       "github.com/apache/dubbo-go-pixiu/pkg/server"
+)
+
+// RouteReloader implements the HotReloader interface for reloading route 
configurations.
+type RouteReloader struct{}
+
+// CheckUpdate compares the old and new route configurations to determine if a 
reload is needed.
+func (r *RouteReloader) CheckUpdate(oldConfig, newConfig *model.Bootstrap) 
bool {
+       oldRoutes := extractRoutes(oldConfig)
+       newRoutes := extractRoutes(newConfig)
+       // Compare the number of routes
+       if len(oldRoutes.Routes) != len(newRoutes.Routes) || oldRoutes.Dynamic 
!= newRoutes.Dynamic {
+               return true
+       }
+
+       // Compare each route
+       for i := range newRoutes.Routes {
+               if oldRoutes.Routes[i].Match.Prefix != 
newRoutes.Routes[i].Match.Prefix ||
+                       oldRoutes.Routes[i].Route.Cluster != 
newRoutes.Routes[i].Route.Cluster {
+                       return true
+               }
+       }
+       return false
+}
+
+// HotReload applies the new route configuration.
+func (r *RouteReloader) HotReload(oldConfig, newConfig *model.Bootstrap) error 
{
+       oldRoutes := extractRoutes(oldConfig)
+       newRoutes := extractRoutes(newConfig)
+
+       // Update routes in the RouterManager
+       err := server.GetRouterManager().UpdateRoutes(oldRoutes.Routes, 
newRoutes.Routes)
+       if err != nil {
+               logger.Infof("Failed to Routes reloaded.")
+               return err
+       }
+
+       return nil
+}
+
+// extractRoutes extracts routes from the configuration by parsing the filters.
+func extractRoutes(config *model.Bootstrap) model.RouteConfiguration {
+       var (
+               routeConfig     model.RouteConfiguration
+               invalidRouteIDs []string
+       )
+       for _, listener := range config.StaticResources.Listeners {
+               for _, filterChain := range listener.FilterChain.Filters {
+                       if filterChain.Name == 
constant.HTTPConnectManagerFilter {
+                               // Extract route_config
+                               rawRouteConfig, ok := 
filterChain.Config["route_config"]
+                               if !ok {
+                                       logger.Debugf("No route_config found in 
filter chain: %+v", filterChain)
+                                       continue
+                               }
+                               logger.Debugf("Raw route_config: %+v", 
rawRouteConfig)
+
+                               // Convert route_config to JSON bytes
+                               routeConfigBytes, err := 
json.Marshal(rawRouteConfig)
+                               if err != nil {
+                                       logger.Errorf("Failed to marshal 
route_config: %v", err)
+                                       continue
+                               }
+
+                               // Parse JSON bytes into 
model.RouteConfiguration
+                               if err := json.Unmarshal(routeConfigBytes, 
&routeConfig); err != nil {
+                                       logger.Errorf("Failed to unmarshal 
route_config: %v", err)
+                                       continue
+                               }
+
+                               logger.Debugf("Parsed route_config: %+v", 
routeConfig)
+
+                               // Validate and filter routes
+                               validRoutes := make([]*model.Router, 0, 
len(routeConfig.Routes))
+                               for _, route := range routeConfig.Routes {
+                                       if err := validateRoute(route); err != 
nil {
+                                               invalidRouteIDs = 
append(invalidRouteIDs, route.ID)
+                                               logger.Warnf("Skipping invalid 
route %s: %v", route.ID, err)
+                                               continue
+                                       }
+                                       validRoutes = append(validRoutes, route)
+                               }
+
+                               routeConfig.Routes = validRoutes
+                               logger.Debugf("Valid routes after filtering: 
%+v", validRoutes)
+
+                               // Return if we have valid routes
+                               if len(validRoutes) > 0 {
+                                       return routeConfig
+                               }
+                       }
+               }
+       }
+
+       if len(invalidRouteIDs) > 0 {
+               logger.Warnf("No valid routes found in configuration: %v", 
invalidRouteIDs)
+       }
+       return routeConfig
+}
+
+// validateRoute validates a single route, returning an error if invalid.
+func validateRoute(route *model.Router) error {
+       // Ensure route has a valid match condition
+       if route.Match.Prefix == "" && route.Match.Path == "" {
+               return errors.Errorf("route %s has no prefix or path defined", 
route.ID)
+       }
+
+       // Ensure cluster is specified
+       if route.Route.Cluster == "" {
+               return errors.Errorf("route %s has no cluster defined", 
route.ID)
+       }
+
+       return nil
+}
diff --git a/pkg/listener/http/http_listener.go 
b/pkg/listener/http/http_listener.go
index 6af98cb9..1cc75ae2 100644
--- a/pkg/listener/http/http_listener.go
+++ b/pkg/listener/http/http_listener.go
@@ -114,6 +114,7 @@ func (ls *HttpListenerService) httpsListener() {
 
        mux := http.NewServeMux()
        mux.HandleFunc("/", hl.ServeHTTP)
+
        m := &autocert.Manager{
                Cache:      
autocert.DirCache(ls.Config.Address.SocketAddress.CertsDir),
                Prompt:     autocert.AcceptTOS,
diff --git a/pkg/server/listener_manager.go b/pkg/server/listener_manager.go
index 1cdccda4..c4ebf94b 100644
--- a/pkg/server/listener_manager.go
+++ b/pkg/server/listener_manager.go
@@ -140,6 +140,9 @@ func (lm *ListenerManager) AddListener(lsConf 
*model.Listener) error {
 }
 
 func (lm *ListenerManager) UpdateListener(m *model.Listener) error {
+       if m == nil {
+               return errors.New("UpdateListener error: provided listener 
config is nil")
+       }
        // lock
        lm.rwLock.Lock()
        defer lm.rwLock.Unlock()
diff --git a/pkg/server/router_manager.go b/pkg/server/router_manager.go
index 149f7262..9301f532 100644
--- a/pkg/server/router_manager.go
+++ b/pkg/server/router_manager.go
@@ -17,6 +17,10 @@
 
 package server
 
+import (
+       "github.com/pkg/errors"
+)
+
 import (
        "github.com/apache/dubbo-go-pixiu/pkg/logger"
        "github.com/apache/dubbo-go-pixiu/pkg/model"
@@ -55,3 +59,58 @@ func (rm *RouterManager) DeleteRouter(r *model.Router) {
                l.OnDeleteRouter(r)
        }
 }
+
+// UpdateRoutes updates the routes in RouterManager with the provided new 
routes.
+func (rm *RouterManager) UpdateRoutes(oldRoutes []*model.Router, newRoutes 
[]*model.Router) error {
+       logger.Infof("Starting route update with %d new routes", len(newRoutes))
+
+       // Validate new routes
+       if err := validateRoutes(newRoutes); err != nil {
+               logger.Errorf("Invalid routes provided: %v", err)
+               return errors.Wrap(err, "route validation failed")
+       }
+
+       // Notify listeners to delete existing routes
+       for _, route := range oldRoutes {
+               logger.Debugf("Notifying listeners to delete route: %s", 
route.String())
+               for _, listener := range rm.rls {
+                       listener.OnDeleteRouter(route)
+               }
+       }
+
+       // Notify listeners to add new routes
+       for _, route := range newRoutes {
+               logger.Debugf("Notifying listeners to add route: %s", 
route.String())
+               for _, listener := range rm.rls {
+                       listener.OnAddRouter(route)
+               }
+       }
+
+       // Atomically update the active configuration
+       logger.Infof("Routes updated successfully with %d routes", 
len(newRoutes))
+
+       return nil
+}
+
+// validateRoutes performs basic validation on the provided routes.
+func validateRoutes(routes []*model.Router) error {
+       routeIDs := make(map[string]struct{}, len(routes))
+       for _, route := range routes {
+               // Check for duplicate IDs
+               if _, exists := routeIDs[route.ID]; exists {
+                       return errors.Errorf("duplicate route ID: %s", route.ID)
+               }
+               routeIDs[route.ID] = struct{}{}
+
+               // Ensure route has a valid match condition
+               if route.Match.Prefix == "" && route.Match.Path == "" {
+                       return errors.Errorf("route %s has no prefix or path 
defined", route.ID)
+               }
+
+               // Ensure cluster is specified
+               if route.Route.Cluster == "" {
+                       return errors.Errorf("route %s has no cluster defined", 
route.ID)
+               }
+       }
+       return nil
+}

Reply via email to