This is an automated email from the ASF dual-hosted git repository.
marsevilspirit pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/develop by this push:
new e57596571 Feat-conf hot-loading capacity (#2992)
e57596571 is described below
commit e5759657145f441f879524e444d3b211779eb363
Author: Wiggins <[email protected]>
AuthorDate: Sun Nov 2 20:38:49 2025 +0800
Feat-conf hot-loading capacity (#2992)
* feat conf hot-laod
* feat conf hot-laod #2925
* update fmt #2925
* add gosafly #2925
* update #2925
* #2925
* #2925
* Delete tools/dubbogo-cli/cmd/testGenCode/template/newApp/go.sum
* Delete tools/dubbogo-cli/cmd/testGenCode/template/newDemo/go.sum
* update fmt and review advice #2925
* update add test #2925
* update #2925
* update #2925
* update #2925
---------
Co-authored-by: 吴孝宇 <[email protected]>
---
loader.go | 181 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
loader_test.go | 110 +++++++++++++++++++++++++++++++++++
2 files changed, 289 insertions(+), 2 deletions(-)
diff --git a/loader.go b/loader.go
index fa5cf5ab6..fd54c963b 100644
--- a/loader.go
+++ b/loader.go
@@ -20,12 +20,17 @@ package dubbo
import (
"os"
"path/filepath"
+ "reflect"
"runtime"
"strings"
+ "sync"
)
import (
"github.com/dubbogo/gost/log/logger"
+ gr "github.com/dubbogo/gost/runtime"
+
+ "github.com/fsnotify/fsnotify"
"github.com/knadh/koanf"
"github.com/knadh/koanf/parsers/json"
@@ -39,13 +44,28 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/constant/file"
+ "dubbo.apache.org/dubbo-go/v3/common/extension"
)
var (
- defaultActive = "default"
- instanceOptions = defaultInstanceOptions()
+ defaultActive = "default"
+ instanceOptions = defaultInstanceOptions()
+ instanceOptionsMutex sync.Mutex
+ once sync.Once
+ stopOnce sync.Once
)
+// fileWatcher manages the file watching state and concurrency control
+type fileWatcher struct {
+ stopCh chan struct{}
+ watcherWg sync.WaitGroup
+ mu sync.Mutex
+}
+
+var watcher = &fileWatcher{
+ stopCh: make(chan struct{}),
+}
+
func Load(opts ...LoaderConfOption) error {
conf := NewLoaderConf(opts...)
if conf.opts == nil {
@@ -64,9 +84,93 @@ func Load(opts ...LoaderConfOption) error {
}
instance := &Instance{insOpts: instanceOptions}
+ // start the file watcher
+ once.Do(func() {
+ watcher.watcherWg.Add(1)
+ gr.GoSafely(&watcher.watcherWg, false, func() {
+ watch(conf, watcher.stopCh)
+ }, nil)
+ extension.AddCustomShutdownCallback(func() {
+ StopFileWatcher()
+ })
+ })
return instance.start()
}
+func watch(conf *loaderConf, stopCh <-chan struct{}) {
+ watcher, err := fsnotify.NewWatcher()
+ if err != nil {
+ logger.Errorf("Failed to initialize file watcher, error: %v",
err)
+ return
+ }
+ defer watcher.Close()
+
+ err = watcher.Add(conf.path)
+ if err != nil {
+ logger.Errorf("Failed to add file %s to watcher, error: %v",
conf.path, err)
+ return
+ }
+
+ for {
+ select {
+ case <-stopCh:
+ logger.Infof("File watcher is stopping...")
+ return
+ case event := <-watcher.Events:
+ if event.Op&fsnotify.Write == fsnotify.Write {
+ logger.Infof("Configuration file %s updated,
initiating hot reload...", event.Name)
+ if err := hotUpdateConfig(conf); err != nil {
+ logger.Warnf("Hot reload of
configuration failed, error: %v", err)
+ }
+ }
+ case err := <-watcher.Errors:
+ logger.Warnf("File watcher encountered an error: %v",
err)
+ }
+ }
+}
+
+func hotUpdateConfig(conf *loaderConf) error {
+ newOpts := defaultInstanceOptions()
+
+ newBytes, err := os.ReadFile(conf.path)
+ if err != nil {
+ return err
+ }
+ oldBytes := conf.bytes
+
+ oldKoan := buildKoanfFromBytes(conf, oldBytes)
+ newKoan := buildKoanfFromBytes(conf, newBytes)
+
+ if !safeChanged(oldKoan, newKoan) {
+ logger.Warnf("Hot reload denied: changes outside allowed
hot-reload keys detected")
+ return errors.New("hot reload denied: disallowed configuration
changes detected")
+ }
+
+ conf.bytes = newBytes
+
+ koan := newKoan
+ if err := koan.UnmarshalWithConf(newOpts.Prefix(), newOpts,
koanf.UnmarshalConf{Tag: "yaml"}); err != nil {
+ return err
+ }
+
+ if err := newOpts.init(); err != nil {
+ return err
+ }
+
+ // Any part of the application that accesses instanceOptions directly
will now use the new values.
+ instanceOptionsMutex.Lock()
+ instanceOptions = newOpts
+ instanceOptionsMutex.Unlock()
+
+ // Explicitly update logger level after hot reload
+ if ok := logger.SetLoggerLevel(instanceOptions.Logger.Level); !ok {
+ logger.Warnf("Failed to update logger level after hot reload.
Logger may not support dynamic level changes.")
+ }
+
+ logger.Infof("Configuration hot reload completed successfully!")
+ return nil
+}
+
type loaderConf struct {
suffix string // loaderConf file extension default yaml
path string // loaderConf file path default
./conf/dubbogo.yaml
@@ -336,3 +440,76 @@ func checkPlaceholder(s string) (newKey, defaultValue
string) {
return
}
+
+// StopFileWatcher Stop file listener
+func StopFileWatcher() {
+ logger.Info("Stopping file watcher...")
+ stopOnce.Do(func() {
+ watcher.mu.Lock()
+ defer watcher.mu.Unlock()
+ close(watcher.stopCh)
+ })
+ watcher.watcherWg.Wait()
+ logger.Info("File watcher stopped successfully")
+}
+
+func buildKoanfFromBytes(conf *loaderConf, b []byte) *koanf.Koanf {
+ c := *conf
+ c.bytes = b
+ k := GetConfigResolver(&c)
+ return c.MergeConfig(k)
+}
+
+var hotReloadAllowedPredicates = []func(string) bool{
+ func(k string) bool { return strings.Contains(k, ".logger.") },
+}
+
+func AllowHotReloadPrefix(prefix string) {
+ hotReloadAllowedPredicates = append(hotReloadAllowedPredicates, func(k
string) bool { return strings.HasPrefix(k, prefix) })
+}
+
+func AllowHotReloadContains(substr string) {
+ hotReloadAllowedPredicates = append(hotReloadAllowedPredicates, func(k
string) bool { return strings.Contains(k, substr) })
+}
+
+func AllowHotReloadExact(key string) {
+ hotReloadAllowedPredicates = append(hotReloadAllowedPredicates, func(k
string) bool { return k == key })
+}
+
+func isAllowedKey(key string) bool {
+ for _, p := range hotReloadAllowedPredicates {
+ if p(key) {
+ return true
+ }
+ }
+ return false
+}
+
+func safeChanged(oldK, newK *koanf.Koanf) bool {
+ oldAll := oldK.All()
+ newAll := newK.All()
+
+ keys := make(map[string]struct{}, len(oldAll)+len(newAll))
+ for k := range oldAll {
+ keys[k] = struct{}{}
+ }
+ for k := range newAll {
+ keys[k] = struct{}{}
+ }
+
+ for k := range keys {
+
+ if isAllowedKey(k) {
+ continue
+ }
+ ov, oOk := oldAll[k]
+ nv, nOk := newAll[k]
+ if oOk != nOk {
+ return false
+ }
+ if !reflect.DeepEqual(ov, nv) {
+ return false
+ }
+ }
+ return true
+}
diff --git a/loader_test.go b/loader_test.go
new file mode 100644
index 000000000..755c5a8c8
--- /dev/null
+++ b/loader_test.go
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dubbo
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+)
+
+func writeFile(t *testing.T, dir, name, content string) string {
+ t.Helper()
+ p := filepath.Join(dir, name)
+ if err := os.WriteFile(p, []byte(content), 0o600); err != nil {
+ t.Fatalf("write file: %v", err)
+ }
+ return p
+}
+
+func TestHotUpdateConfig_AllowsLoggerLevelChange(t *testing.T) {
+ // snapshot globals we touch and restore afterwards
+ prevIns := instanceOptions
+ defer func() { instanceOptions = prevIns }()
+
+ tmp := t.TempDir()
+ base := "dubbo:\n logger:\n level: info\n"
+ updated := "dubbo:\n logger:\n level: debug\n"
+
+ path := writeFile(t, tmp, "conf.yaml", base)
+ conf := NewLoaderConf(WithPath(path))
+
+ // overwrite the file with the updated content
+ if err := os.WriteFile(path, []byte(updated), 0o600); err != nil {
+ t.Fatalf("overwrite file: %v", err)
+ }
+
+ if err := hotUpdateConfig(conf); err != nil {
+ t.Fatalf("hotUpdateConfig unexpected error: %v", err)
+ }
+ if got := instanceOptions.Logger.Level; got != "debug" {
+ t.Fatalf("logger level not updated, want=debug got=%s", got)
+ }
+}
+
+func TestHotUpdateConfig_DeniesDisallowedChange(t *testing.T) {
+ // snapshot globals we touch and restore afterwards
+ prevIns := instanceOptions
+ defer func() { instanceOptions = prevIns }()
+
+ tmp := t.TempDir()
+ base := "dubbo:\n application:\n name: app1\n"
+ updated := "dubbo:\n application:\n name: app2\n"
+
+ path := writeFile(t, tmp, "conf.yaml", base)
+ conf := NewLoaderConf(WithPath(path))
+
+ // ensure a known baseline value in globals
+ instanceOptions.Application.Name = "baseline"
+
+ if err := os.WriteFile(path, []byte(updated), 0o600); err != nil {
+ t.Fatalf("overwrite file: %v", err)
+ }
+
+ if err := hotUpdateConfig(conf); err == nil {
+ t.Fatalf("expected error for disallowed change, got nil")
+ }
+ if got := instanceOptions.Application.Name; got != "baseline" {
+ t.Fatalf("instanceOptions changed unexpectedly, want=baseline
got=%s", got)
+ }
+}
+
+func TestHotUpdateConfig_AllowsWithCustomPrefix(t *testing.T) {
+ // snapshot globals and hot-reload predicates
+ prevIns := instanceOptions
+ prevPreds := hotReloadAllowedPredicates
+ defer func() { instanceOptions = prevIns; hotReloadAllowedPredicates =
prevPreds }()
+
+ tmp := t.TempDir()
+ base := "dubbo:\n application:\n name: app1\n"
+ updated := "dubbo:\n application:\n name: app2\n"
+
+ path := writeFile(t, tmp, "conf.yaml", base)
+ conf := NewLoaderConf(WithPath(path))
+
+ // allow changing any key under dubbo.application.*
+ AllowHotReloadPrefix("dubbo.application.")
+
+ if err := os.WriteFile(path, []byte(updated), 0o600); err != nil {
+ t.Fatalf("overwrite file: %v", err)
+ }
+
+ if err := hotUpdateConfig(conf); err != nil {
+ t.Fatalf("hotUpdateConfig unexpected error with allowed prefix:
%v", err)
+ }
+}