This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new 67d03a54 [navi] discovery xds cachelogic (#761)
67d03a54 is described below
commit 67d03a54bd72849d7c97a43ffc60faf4d967cc58
Author: Jian Zhong <[email protected]>
AuthorDate: Thu Jul 31 14:41:14 2025 +0800
[navi] discovery xds cachelogic (#761)
---
navigator/pkg/bootstrap/server.go | 12 +-
navigator/pkg/features/navigator.go | 12 +
navigator/pkg/features/xds.go | 35 ++
.../pkg/{bootstrap/server.go => model/context.go} | 28 +-
navigator/pkg/model/typed_xds_cache.go | 18 +
navigator/pkg/model/xds_cache.go | 29 ++
pkg/cmd/cmd.go | 17 +
pkg/env/var.go | 405 +++++++++++++++++++++
8 files changed, 549 insertions(+), 7 deletions(-)
diff --git a/navigator/pkg/bootstrap/server.go
b/navigator/pkg/bootstrap/server.go
index c4d55457..2ca21681 100644
--- a/navigator/pkg/bootstrap/server.go
+++ b/navigator/pkg/bootstrap/server.go
@@ -17,11 +17,21 @@
package bootstrap
+import "github.com/apache/dubbo-kubernetes/navigator/pkg/model"
+
type Server struct {
+ environment *model.Environment
}
func NewServer(args *NaviArgs, initFuncs ...func(*Server)) (*Server, error) {
- return nil, nil
+ e := model.NewEnvironment()
+ s := &Server{
+ environment: e,
+ }
+ for _, fn := range initFuncs {
+ fn(s)
+ }
+ return s, nil
}
func (s *Server) Start(stop <-chan struct{}) error {
diff --git a/navigator/pkg/features/navigator.go
b/navigator/pkg/features/navigator.go
new file mode 100644
index 00000000..1c21a165
--- /dev/null
+++ b/navigator/pkg/features/navigator.go
@@ -0,0 +1,12 @@
+package features
+
+import "github.com/apache/dubbo-kubernetes/pkg/env"
+
+var (
+ EnableUnsafeAssertions = env.Register(
+ "UNSAFE_NAVIGATOR_ENABLE_RUNTIME_ASSERTIONS",
+ false,
+ "If enabled, addition runtime asserts will be performed. "+
+ "These checks are both expensive and panic on failure.
As a result, this should be used only for testing.",
+ ).Get()
+)
diff --git a/navigator/pkg/features/xds.go b/navigator/pkg/features/xds.go
new file mode 100644
index 00000000..ec8e0cee
--- /dev/null
+++ b/navigator/pkg/features/xds.go
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package features
+
+import "github.com/apache/dubbo-kubernetes/pkg/env"
+
+var (
+ EnableXDSCaching = env.Register("NAVIGATOR_ENABLE_XDS_CACHE", true,
+ "If true, Navigator will cache XDS responses.").Get()
+
+ // EnableCDSCaching determines if CDS caching is enabled. This is
explicitly split out of ENABLE_XDS_CACHE,
+ // so that in case there are issues with the CDS cache we can just
disable the CDS cache.
+ EnableCDSCaching = env.Register("PILOT_ENABLE_CDS_CACHE", true,
+ "If true, Pilot will cache CDS responses. Note: this depends on
PILOT_ENABLE_XDS_CACHE.").Get()
+
+ // EnableRDSCaching determines if RDS caching is enabled. This is
explicitly split out of ENABLE_XDS_CACHE,
+ // so that in case there are issues with the RDS cache we can just
disable the RDS cache.
+ EnableRDSCaching = env.Register("PILOT_ENABLE_RDS_CACHE", true,
+ "If true, Pilot will cache RDS responses. Note: this depends on
PILOT_ENABLE_XDS_CACHE.").Get()
+)
diff --git a/navigator/pkg/bootstrap/server.go b/navigator/pkg/model/context.go
similarity index 63%
copy from navigator/pkg/bootstrap/server.go
copy to navigator/pkg/model/context.go
index c4d55457..959517c6 100644
--- a/navigator/pkg/bootstrap/server.go
+++ b/navigator/pkg/model/context.go
@@ -15,15 +15,31 @@
* limitations under the License.
*/
-package bootstrap
+package model
-type Server struct {
+import (
+ "github.com/apache/dubbo-kubernetes/navigator/pkg/features"
+)
+
+type Environment struct {
+ Cache XdsCache
}
-func NewServer(args *NaviArgs, initFuncs ...func(*Server)) (*Server, error) {
- return nil, nil
+type XdsCacheImpl struct {
+ cds typedXdsCache[uint64]
+ eds typedXdsCache[uint64]
+ rds typedXdsCache[uint64]
+ sds typedXdsCache[string]
}
-func (s *Server) Start(stop <-chan struct{}) error {
- return nil
+func NewEnvironment() *Environment {
+ var cache XdsCache
+ if features.EnableXDSCaching {
+ cache = NewXdsCache()
+ } else {
+ cache = DisabledCache{}
+ }
+ return &Environment{
+ Cache: cache,
+ }
}
diff --git a/navigator/pkg/model/typed_xds_cache.go
b/navigator/pkg/model/typed_xds_cache.go
new file mode 100644
index 00000000..7825e44e
--- /dev/null
+++ b/navigator/pkg/model/typed_xds_cache.go
@@ -0,0 +1,18 @@
+package model
+
+type typedXdsCache[K comparable] interface {
+}
+
+type lruCache[K comparable] struct {
+}
+
+var _ typedXdsCache[uint64] = &lruCache[uint64]{}
+
+func newTypedXdsCache[K comparable]() typedXdsCache[K] {
+ cache := &lruCache[K]{}
+ return cache
+}
+
+type disabledCache[K comparable] struct{}
+
+var _ typedXdsCache[uint64] = &disabledCache[uint64]{}
diff --git a/navigator/pkg/model/xds_cache.go b/navigator/pkg/model/xds_cache.go
new file mode 100644
index 00000000..ae74659b
--- /dev/null
+++ b/navigator/pkg/model/xds_cache.go
@@ -0,0 +1,29 @@
+package model
+
+import (
+ "github.com/apache/dubbo-kubernetes/navigator/pkg/features"
+)
+
+type XdsCache interface{}
+
+type DisabledCache struct{}
+
+func NewXdsCache() XdsCache {
+ cache := XdsCacheImpl{
+ eds: newTypedXdsCache[uint64](),
+ }
+ if features.EnableCDSCaching {
+ cache.cds = newTypedXdsCache[uint64]()
+ } else {
+ cache.cds = disabledCache[uint64]{}
+ }
+ if features.EnableRDSCaching {
+ cache.rds = newTypedXdsCache[uint64]()
+ } else {
+ cache.rds = disabledCache[uint64]{}
+ }
+
+ cache.sds = newTypedXdsCache[string]()
+
+ return cache
+}
diff --git a/pkg/cmd/cmd.go b/pkg/cmd/cmd.go
index ab77cf8d..9b42d451 100644
--- a/pkg/cmd/cmd.go
+++ b/pkg/cmd/cmd.go
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package cmd
import (
diff --git a/pkg/env/var.go b/pkg/env/var.go
new file mode 100644
index 00000000..d41ec3f2
--- /dev/null
+++ b/pkg/env/var.go
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package env
+
+import (
+ "encoding/json"
+ "fmt"
+ "os"
+ "sort"
+ "strconv"
+ "sync"
+ "time"
+)
+
+// The type of a variable's value
+type VarType byte
+
+const (
+ // Variable holds a free-form string.
+ STRING VarType = iota
+ // Variable holds a boolean value.
+ BOOL
+ // Variable holds a signed integer.
+ INT
+ // Variables holds a floating point value.
+ FLOAT
+ // Variable holds a time duration.
+ DURATION
+ // Variable holds a dynamic unknown type.
+ OTHER
+)
+
+// Var describes a single environment variable
+type Var struct {
+ // The name of the environment variable.
+ Name string
+
+ // The optional default value of the environment variable.
+ DefaultValue string
+
+ // Description of the environment variable's purpose.
+ Description string
+
+ // Hide the existence of this variable when outputting usage
information.
+ Hidden bool
+
+ // Mark this variable as deprecated when generating usage information.
+ Deprecated bool
+
+ // The type of the variable's value
+ Type VarType
+
+ // The underlying Go type of the variable
+ GoType string
+}
+
+// StringVar represents a single string environment variable.
+type StringVar struct {
+ Var
+}
+
+// BoolVar represents a single boolean environment variable.
+type BoolVar struct {
+ Var
+}
+
+// IntVar represents a single integer environment variable.
+type IntVar struct {
+ Var
+}
+
+// FloatVar represents a single floating-point environment variable.
+type FloatVar struct {
+ Var
+}
+
+// DurationVar represents a single duration environment variable.
+type DurationVar struct {
+ Var
+}
+
+var (
+ allVars = make(map[string]Var)
+ mutex sync.Mutex
+)
+
+// VarDescriptions returns a description of this process' environment
variables, sorted by name.
+func VarDescriptions() []Var {
+ mutex.Lock()
+ sorted := make([]Var, 0, len(allVars))
+ for _, v := range allVars {
+ sorted = append(sorted, v)
+ }
+ mutex.Unlock()
+
+ sort.Slice(sorted, func(i, j int) bool {
+ return sorted[i].Name < sorted[j].Name
+ })
+
+ return sorted
+}
+
+type Parseable interface {
+ comparable
+}
+
+type GenericVar[T Parseable] struct {
+ Var
+ delegate specializedVar[T]
+}
+
+func Register[T Parseable](name string, defaultValue T, description string)
GenericVar[T] {
+ // Specialized cases
+ // In the future, once only Register() remains, we can likely drop most
of these.
+ // however, time.Duration is needed still as it doesn't implement json
+ switch d := any(defaultValue).(type) {
+ case time.Duration:
+ v := RegisterDurationVar(name, d, description)
+ return GenericVar[T]{v.Var, any(v).(specializedVar[T])}
+ case string:
+ v := RegisterStringVar(name, d, description)
+ return GenericVar[T]{v.Var, any(v).(specializedVar[T])}
+ case float64:
+ v := RegisterFloatVar(name, d, description)
+ return GenericVar[T]{v.Var, any(v).(specializedVar[T])}
+ case int:
+ v := RegisterIntVar(name, d, description)
+ return GenericVar[T]{v.Var, any(v).(specializedVar[T])}
+ case bool:
+ v := RegisterBoolVar(name, d, description)
+ return GenericVar[T]{v.Var, any(v).(specializedVar[T])}
+ }
+ b, _ := json.Marshal(defaultValue)
+ v := Var{Name: name, DefaultValue: string(b), Description: description,
Type: STRING, GoType: fmt.Sprintf("%T", defaultValue)}
+ RegisterVar(v)
+ return GenericVar[T]{getVar(name), nil}
+}
+
+// RegisterStringVar registers a new string environment variable.
+func RegisterStringVar(name string, defaultValue string, description string)
StringVar {
+ v := Var{Name: name, DefaultValue: defaultValue, Description:
description, Type: STRING}
+ RegisterVar(v)
+ return StringVar{getVar(name)}
+}
+
+// RegisterBoolVar registers a new boolean environment variable.
+func RegisterBoolVar(name string, defaultValue bool, description string)
BoolVar {
+ v := Var{Name: name, DefaultValue: strconv.FormatBool(defaultValue),
Description: description, Type: BOOL}
+ RegisterVar(v)
+ return BoolVar{getVar(name)}
+}
+
+// RegisterIntVar registers a new integer environment variable.
+func RegisterIntVar(name string, defaultValue int, description string) IntVar {
+ v := Var{Name: name, DefaultValue:
strconv.FormatInt(int64(defaultValue), 10), Description: description, Type: INT}
+ RegisterVar(v)
+ return IntVar{getVar(name)}
+}
+
+// RegisterFloatVar registers a new floating-point environment variable.
+func RegisterFloatVar(name string, defaultValue float64, description string)
FloatVar {
+ v := Var{Name: name, DefaultValue: strconv.FormatFloat(defaultValue,
'G', -1, 64), Description: description, Type: FLOAT}
+ RegisterVar(v)
+ return FloatVar{v}
+}
+
+// RegisterDurationVar registers a new duration environment variable.
+func RegisterDurationVar(name string, defaultValue time.Duration, description
string) DurationVar {
+ v := Var{Name: name, DefaultValue: defaultValue.String(), Description:
description, Type: DURATION}
+ RegisterVar(v)
+ return DurationVar{getVar(name)}
+}
+
+// RegisterVar registers a generic environment variable.
+func RegisterVar(v Var) {
+ mutex.Lock()
+
+ if old, ok := allVars[v.Name]; ok {
+ if v.Description != "" {
+ allVars[v.Name] = v // last one with a description wins
if the same variable name is registered multiple times
+ }
+
+ if old.Description != v.Description || old.DefaultValue !=
v.DefaultValue || old.Type != v.Type || old.Deprecated != v.Deprecated ||
old.Hidden != v.Hidden {
+ fmt.Printf("The environment variable %s was registered
multiple times using different metadata: %v, %v", v.Name, old, v)
+ }
+ } else {
+ allVars[v.Name] = v
+ }
+
+ mutex.Unlock()
+}
+
+func getVar(name string) Var {
+ mutex.Lock()
+ result := allVars[name]
+ mutex.Unlock()
+
+ return result
+}
+
+// Get retrieves the value of the environment variable.
+// It returns the value, which will be the default if the variable is not
present.
+// To distinguish between an empty value and an unset value, use Lookup.
+func (v StringVar) Get() string {
+ result, _ := v.Lookup()
+ return result
+}
+
+// Lookup retrieves the value of the environment variable. If the
+// variable is present in the environment the
+// value (which may be empty) is returned and the boolean is true.
+// Otherwise the returned value will be the default and the boolean will
+// be false.
+func (v StringVar) Lookup() (string, bool) {
+ result, ok := os.LookupEnv(v.Name)
+ if !ok {
+ result = v.DefaultValue
+ }
+
+ return result, ok
+}
+
+// Get retrieves the value of the environment variable.
+// It returns the value, which will be the default if the variable is not
present.
+// To distinguish between an empty value and an unset value, use Lookup.
+func (v BoolVar) Get() bool {
+ result, _ := v.Lookup()
+ return result
+}
+
+// Lookup retrieves the value of the environment variable. If the
+// variable is present in the environment the
+// value (which may be empty) is returned and the boolean is true.
+// Otherwise the returned value will be the default and the boolean will
+// be false.
+func (v BoolVar) Lookup() (bool, bool) {
+ result, ok := os.LookupEnv(v.Name)
+ if !ok {
+ result = v.DefaultValue
+ }
+
+ b, err := strconv.ParseBool(result)
+ if err != nil {
+ fmt.Printf("Invalid environment variable value `%s`, expecting
true/false, defaulting to %v", result, v.DefaultValue)
+ b, _ = strconv.ParseBool(v.DefaultValue)
+ }
+
+ return b, ok
+}
+
+// Get retrieves the value of the environment variable.
+// It returns the value, which will be the default if the variable is not
present.
+// To distinguish between an empty value and an unset value, use Lookup.
+func (v IntVar) Get() int {
+ result, _ := v.Lookup()
+ return result
+}
+
+// Lookup retrieves the value of the environment variable. If the
+// variable is present in the environment the
+// value (which may be empty) is returned and the boolean is true.
+// Otherwise the returned value will be the default and the boolean will
+// be false.
+func (v IntVar) Lookup() (int, bool) {
+ result, ok := os.LookupEnv(v.Name)
+ if !ok {
+ result = v.DefaultValue
+ }
+
+ i, err := strconv.Atoi(result)
+ if err != nil {
+ fmt.Printf("Invalid environment variable value `%s`, expecting
an integer, defaulting to %v", result, v.DefaultValue)
+ i, _ = strconv.Atoi(v.DefaultValue)
+ }
+
+ return i, ok
+}
+
+// Get retrieves the value of the environment variable.
+// It returns the value, which will be the default if the variable is not
present.
+// To distinguish between an empty value and an unset value, use Lookup.
+func (v FloatVar) Get() float64 {
+ result, _ := v.Lookup()
+ return result
+}
+
+// Lookup retrieves the value of the environment variable. If the
+// variable is present in the environment the
+// value (which may be empty) is returned and the boolean is true.
+// Otherwise the returned value will be the default and the boolean will
+// be false.
+func (v FloatVar) Lookup() (float64, bool) {
+ result, ok := os.LookupEnv(v.Name)
+ if !ok {
+ result = v.DefaultValue
+ }
+
+ f, err := strconv.ParseFloat(result, 64)
+ if err != nil {
+ fmt.Printf("Invalid environment variable value `%s`, expecting
a floating-point value, defaulting to %v", result, v.DefaultValue)
+ f, _ = strconv.ParseFloat(v.DefaultValue, 64)
+ }
+
+ return f, ok
+}
+
+// Get retrieves the value of the environment variable.
+// It returns the value, which will be the default if the variable is not
present.
+// To distinguish between an empty value and an unset value, use Lookup.
+func (v DurationVar) Get() time.Duration {
+ result, _ := v.Lookup()
+ return result
+}
+
+// Lookup retrieves the value of the environment variable. If the
+// variable is present in the environment the
+// value (which may be empty) is returned and the boolean is true.
+// Otherwise the returned value will be the default and the boolean will
+// be false.
+func (v DurationVar) Lookup() (time.Duration, bool) {
+ result, ok := os.LookupEnv(v.Name)
+ if !ok {
+ result = v.DefaultValue
+ }
+
+ d, err := time.ParseDuration(result)
+ if err != nil {
+ fmt.Printf("Invalid environment variable value `%s`, expecting
a duration, defaulting to %v", result, v.DefaultValue)
+ d, _ = time.ParseDuration(v.DefaultValue)
+ }
+
+ return d, ok
+}
+
+// Get retrieves the value of the environment variable.
+// It returns the value, which will be the default if the variable is not
present.
+// To distinguish between an empty value and an unset value, use Lookup.
+func (v GenericVar[T]) Get() T {
+ if v.delegate != nil {
+ return v.delegate.Get()
+ }
+ result, _ := v.Lookup()
+ return result
+}
+
+// Lookup retrieves the value of the environment variable. If the
+// variable is present in the environment the
+// value (which may be empty) is returned and the boolean is true.
+// Otherwise the returned value will be the default and the boolean will
+// be false.
+func (v GenericVar[T]) Lookup() (T, bool) {
+ if v.delegate != nil {
+ return v.delegate.Lookup()
+ }
+ result, ok := os.LookupEnv(v.Name)
+ if !ok {
+ result = v.DefaultValue
+ }
+
+ res := new(T)
+
+ if err := json.Unmarshal([]byte(result), res); err != nil {
+ fmt.Printf("Invalid environment variable value `%s` defaulting
to %v: %v", result, v.DefaultValue, err)
+ _ = json.Unmarshal([]byte(v.DefaultValue), res)
+ }
+
+ return *res, ok
+}
+
+func (v GenericVar[T]) IsSet() bool {
+ _, ok := v.Lookup()
+ return ok
+}
+
+func (v GenericVar[T]) GetName() string {
+ return v.Var.Name
+}
+
+// specializedVar represents a var that can Get/Lookup
+type specializedVar[T any] interface {
+ Lookup() (T, bool)
+ Get() T
+}
+
+// VariableInfo provides generic information about a variable. All Variables
implement this interface.
+// This is largely to workaround lack of covariance in Go.
+type VariableInfo interface {
+ GetName() string
+ IsSet() bool
+}