This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/develop by this push:
new 9b93fccf feat: Support dynamic update of routing configuration (#658)
9b93fccf is described below
commit 9b93fccf1e28eca1483bf8e8f57129d29c8e0f4c
Author: 1kasa <[email protected]>
AuthorDate: Fri May 2 11:46:51 2025 +0800
feat: Support dynamic update of routing configuration (#658)
* Support dynamic update of routing configuration
* Support dynamic update of routing configuration
* fix: wrong commit
* fix: wrong commit
* fix: wrong commit
* fix: comment
* fix: comment
* handle ci error
* handle error
* handle ci error
* handle comment
* handle import
* feat testcase
---
pkg/common/router/trie/trie_test.go | 36 +++++++++
pkg/hotreload/hotreload.go | 3 +-
pkg/hotreload/logger.go | 10 +--
pkg/hotreload/route_reloader.go | 145 ++++++++++++++++++++++++++++++++++++
pkg/listener/http/http_listener.go | 1 +
pkg/server/listener_manager.go | 3 +
pkg/server/router_manager.go | 59 +++++++++++++++
7 files changed, 251 insertions(+), 6 deletions(-)
diff --git a/pkg/common/router/trie/trie_test.go
b/pkg/common/router/trie/trie_test.go
index 3b3aa40f..0e6a7080 100644
--- a/pkg/common/router/trie/trie_test.go
+++ b/pkg/common/router/trie/trie_test.go
@@ -206,3 +206,39 @@ func TestTrie_ParamMatch(t *testing.T) {
assert.Equal(t, "", node.GetBizInfo())
assert.True(t, ret)
}
+
+func TestTrieSoftDelete(t *testing.T) {
+ trie := NewTrie()
+
+ ret, _ := trie.Put("/a/b/c", "route1")
+ assert.True(t, ret)
+
+ ret, _ = trie.Put("/a/b/c/d", "route2")
+ assert.True(t, ret)
+
+ ret, _ = trie.Put("/a/b/e", "route3")
+ assert.True(t, ret)
+
+ node, _, ok := trie.Match("/a/b/c")
+ assert.True(t, ok)
+ assert.Equal(t, "route1", node.GetBizInfo())
+
+ _, err := trie.Remove("/a/b/c")
+ assert.NoError(t, err)
+
+ node, _, ok = trie.Match("/a/b/c")
+ assert.False(t, ok)
+ assert.Nil(t, node)
+
+ node, _, ok = trie.Match("/a/b/c/d")
+ assert.True(t, ok)
+ assert.Equal(t, "route2", node.GetBizInfo())
+
+ node, _, ok = trie.Match("/a/b/e")
+ assert.True(t, ok)
+ assert.Equal(t, "route3", node.GetBizInfo())
+
+ node, _, ok = trie.Match("/a/b/c/")
+ assert.False(t, ok)
+ assert.Nil(t, node)
+}
diff --git a/pkg/hotreload/hotreload.go b/pkg/hotreload/hotreload.go
index cd6acce5..133a234e 100644
--- a/pkg/hotreload/hotreload.go
+++ b/pkg/hotreload/hotreload.go
@@ -46,7 +46,7 @@ type Coordinator struct {
manager *config.ConfigManager // Configuration manager
}
-var coordinator = Coordinator{reloaders: []HotReloader{&LoggerReloader{}}}
+var coordinator = Coordinator{reloaders: []HotReloader{&LoggerReloader{},
&RouteReloader{}}}
// StartHotReload initializes the hot reload process.
// It should be called when the project starts, e.g., in cmd/gateway.go.
@@ -58,6 +58,7 @@ func StartHotReload(manager *config.ConfigManager, boot
*model.Bootstrap) {
coordinator.manager = manager
coordinator.boot = boot
+
go coordinator.HotReload()
}
diff --git a/pkg/hotreload/logger.go b/pkg/hotreload/logger.go
index 45430cf3..eb2a024f 100644
--- a/pkg/hotreload/logger.go
+++ b/pkg/hotreload/logger.go
@@ -44,25 +44,25 @@ func (r *LoggerReloader) CheckUpdate(oldConfig, newConfig
*model.Bootstrap) bool
oc.DisableCaller != nc.DisableCaller ||
oc.DisableStacktrace != nc.DisableStacktrace ||
oc.Encoding != nc.Encoding {
- return false
+ return true
}
// Check sampling configuration.
if !r.checkSampling(oc.Sampling, nc.Sampling) {
- return false
+ return true
}
// Check encoder configuration.
if !r.checkEncoderConfig(oc.EncoderConfig, nc.EncoderConfig) {
- return false
+ return true
}
// Check output paths.
if !equal(oc.OutputPaths, nc.OutputPaths) {
- return false
+ return true
}
- return true
+ return false
}
// HotReload applies the new logger configuration.
diff --git a/pkg/hotreload/route_reloader.go b/pkg/hotreload/route_reloader.go
new file mode 100644
index 00000000..53a1ac30
--- /dev/null
+++ b/pkg/hotreload/route_reloader.go
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hotreload
+
+import (
+ "encoding/json"
+)
+
+import (
+ "github.com/pkg/errors"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+ "github.com/apache/dubbo-go-pixiu/pkg/model"
+ "github.com/apache/dubbo-go-pixiu/pkg/server"
+)
+
+// RouteReloader implements the HotReloader interface for reloading route
configurations.
+type RouteReloader struct{}
+
+// CheckUpdate compares the old and new route configurations to determine if a
reload is needed.
+func (r *RouteReloader) CheckUpdate(oldConfig, newConfig *model.Bootstrap)
bool {
+ oldRoutes := extractRoutes(oldConfig)
+ newRoutes := extractRoutes(newConfig)
+ // Compare the number of routes
+ if len(oldRoutes.Routes) != len(newRoutes.Routes) || oldRoutes.Dynamic
!= newRoutes.Dynamic {
+ return true
+ }
+
+ // Compare each route
+ for i := range newRoutes.Routes {
+ if oldRoutes.Routes[i].Match.Prefix !=
newRoutes.Routes[i].Match.Prefix ||
+ oldRoutes.Routes[i].Route.Cluster !=
newRoutes.Routes[i].Route.Cluster {
+ return true
+ }
+ }
+ return false
+}
+
+// HotReload applies the new route configuration.
+func (r *RouteReloader) HotReload(oldConfig, newConfig *model.Bootstrap) error
{
+ oldRoutes := extractRoutes(oldConfig)
+ newRoutes := extractRoutes(newConfig)
+
+ // Update routes in the RouterManager
+ err := server.GetRouterManager().UpdateRoutes(oldRoutes.Routes,
newRoutes.Routes)
+ if err != nil {
+ logger.Infof("Failed to Routes reloaded.")
+ return err
+ }
+
+ return nil
+}
+
+// extractRoutes extracts routes from the configuration by parsing the filters.
+func extractRoutes(config *model.Bootstrap) model.RouteConfiguration {
+ var (
+ routeConfig model.RouteConfiguration
+ invalidRouteIDs []string
+ )
+ for _, listener := range config.StaticResources.Listeners {
+ for _, filterChain := range listener.FilterChain.Filters {
+ if filterChain.Name ==
constant.HTTPConnectManagerFilter {
+ // Extract route_config
+ rawRouteConfig, ok :=
filterChain.Config["route_config"]
+ if !ok {
+ logger.Debugf("No route_config found in
filter chain: %+v", filterChain)
+ continue
+ }
+ logger.Debugf("Raw route_config: %+v",
rawRouteConfig)
+
+ // Convert route_config to JSON bytes
+ routeConfigBytes, err :=
json.Marshal(rawRouteConfig)
+ if err != nil {
+ logger.Errorf("Failed to marshal
route_config: %v", err)
+ continue
+ }
+
+ // Parse JSON bytes into
model.RouteConfiguration
+ if err := json.Unmarshal(routeConfigBytes,
&routeConfig); err != nil {
+ logger.Errorf("Failed to unmarshal
route_config: %v", err)
+ continue
+ }
+
+ logger.Debugf("Parsed route_config: %+v",
routeConfig)
+
+ // Validate and filter routes
+ validRoutes := make([]*model.Router, 0,
len(routeConfig.Routes))
+ for _, route := range routeConfig.Routes {
+ if err := validateRoute(route); err !=
nil {
+ invalidRouteIDs =
append(invalidRouteIDs, route.ID)
+ logger.Warnf("Skipping invalid
route %s: %v", route.ID, err)
+ continue
+ }
+ validRoutes = append(validRoutes, route)
+ }
+
+ routeConfig.Routes = validRoutes
+ logger.Debugf("Valid routes after filtering:
%+v", validRoutes)
+
+ // Return if we have valid routes
+ if len(validRoutes) > 0 {
+ return routeConfig
+ }
+ }
+ }
+ }
+
+ if len(invalidRouteIDs) > 0 {
+ logger.Warnf("No valid routes found in configuration: %v",
invalidRouteIDs)
+ }
+ return routeConfig
+}
+
+// validateRoute validates a single route, returning an error if invalid.
+func validateRoute(route *model.Router) error {
+ // Ensure route has a valid match condition
+ if route.Match.Prefix == "" && route.Match.Path == "" {
+ return errors.Errorf("route %s has no prefix or path defined",
route.ID)
+ }
+
+ // Ensure cluster is specified
+ if route.Route.Cluster == "" {
+ return errors.Errorf("route %s has no cluster defined",
route.ID)
+ }
+
+ return nil
+}
diff --git a/pkg/listener/http/http_listener.go
b/pkg/listener/http/http_listener.go
index 6af98cb9..1cc75ae2 100644
--- a/pkg/listener/http/http_listener.go
+++ b/pkg/listener/http/http_listener.go
@@ -114,6 +114,7 @@ func (ls *HttpListenerService) httpsListener() {
mux := http.NewServeMux()
mux.HandleFunc("/", hl.ServeHTTP)
+
m := &autocert.Manager{
Cache:
autocert.DirCache(ls.Config.Address.SocketAddress.CertsDir),
Prompt: autocert.AcceptTOS,
diff --git a/pkg/server/listener_manager.go b/pkg/server/listener_manager.go
index 1cdccda4..c4ebf94b 100644
--- a/pkg/server/listener_manager.go
+++ b/pkg/server/listener_manager.go
@@ -140,6 +140,9 @@ func (lm *ListenerManager) AddListener(lsConf
*model.Listener) error {
}
func (lm *ListenerManager) UpdateListener(m *model.Listener) error {
+ if m == nil {
+ return errors.New("UpdateListener error: provided listener
config is nil")
+ }
// lock
lm.rwLock.Lock()
defer lm.rwLock.Unlock()
diff --git a/pkg/server/router_manager.go b/pkg/server/router_manager.go
index 149f7262..9301f532 100644
--- a/pkg/server/router_manager.go
+++ b/pkg/server/router_manager.go
@@ -17,6 +17,10 @@
package server
+import (
+ "github.com/pkg/errors"
+)
+
import (
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pkg/model"
@@ -55,3 +59,58 @@ func (rm *RouterManager) DeleteRouter(r *model.Router) {
l.OnDeleteRouter(r)
}
}
+
+// UpdateRoutes updates the routes in RouterManager with the provided new
routes.
+func (rm *RouterManager) UpdateRoutes(oldRoutes []*model.Router, newRoutes
[]*model.Router) error {
+ logger.Infof("Starting route update with %d new routes", len(newRoutes))
+
+ // Validate new routes
+ if err := validateRoutes(newRoutes); err != nil {
+ logger.Errorf("Invalid routes provided: %v", err)
+ return errors.Wrap(err, "route validation failed")
+ }
+
+ // Notify listeners to delete existing routes
+ for _, route := range oldRoutes {
+ logger.Debugf("Notifying listeners to delete route: %s",
route.String())
+ for _, listener := range rm.rls {
+ listener.OnDeleteRouter(route)
+ }
+ }
+
+ // Notify listeners to add new routes
+ for _, route := range newRoutes {
+ logger.Debugf("Notifying listeners to add route: %s",
route.String())
+ for _, listener := range rm.rls {
+ listener.OnAddRouter(route)
+ }
+ }
+
+ // Atomically update the active configuration
+ logger.Infof("Routes updated successfully with %d routes",
len(newRoutes))
+
+ return nil
+}
+
+// validateRoutes performs basic validation on the provided routes.
+func validateRoutes(routes []*model.Router) error {
+ routeIDs := make(map[string]struct{}, len(routes))
+ for _, route := range routes {
+ // Check for duplicate IDs
+ if _, exists := routeIDs[route.ID]; exists {
+ return errors.Errorf("duplicate route ID: %s", route.ID)
+ }
+ routeIDs[route.ID] = struct{}{}
+
+ // Ensure route has a valid match condition
+ if route.Match.Prefix == "" && route.Match.Path == "" {
+ return errors.Errorf("route %s has no prefix or path
defined", route.ID)
+ }
+
+ // Ensure cluster is specified
+ if route.Route.Cluster == "" {
+ return errors.Errorf("route %s has no cluster defined",
route.ID)
+ }
+ }
+ return nil
+}