This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new 50857d51 Update dubbo gateway agent code logic (#861)
50857d51 is described below
commit 50857d510635d8fdcc343e4dfe672b788fa6f101
Author: mfordjody <[email protected]>
AuthorDate: Wed Feb 4 18:15:24 2026 +0800
Update dubbo gateway agent code logic (#861)
---
dubbod/discovery/cmd/dubbo-agent/app/cmd.go | 8 +-
dubbod/discovery/docker/dockerfile.proxy | 2 +-
.../discovery/pkg/config/kube/crdclient/client.go | 154 +++---------------
go.mod | 3 +-
go.sum | 2 -
.../dubbo-discovery/files/grpc-agent.yaml | 3 +-
.../files/{gateway.yaml => kube-gateway.yaml} | 7 +-
.../dubbo-discovery/templates/deployment.yaml | 2 +-
.../templates/dubbd-injector-configmap.yaml | 4 +-
.../dubbo-control/dubbo-discovery/values.yaml | 2 +-
pkg/config/model.go | 70 +-------
pkg/dubboagent/agent.go | 51 +++---
pkg/dubboagent/xds_proxy.go | 180 +++++++++++----------
pkg/kube/inject/inject.go | 2 +-
pkg/kube/inject/webhook.go | 2 +-
pkg/{dubboagent => }/pixiu/agent.go | 53 ++----
pkg/pixiu/config.go | 64 ++++++++
pkg/{dubboagent => }/pixiu/converter.go | 103 +++---------
pkg/{dubboagent => }/pixiu/proxy.go | 33 ++--
samples/grpc-app/README.md | 4 +-
20 files changed, 276 insertions(+), 473 deletions(-)
diff --git a/dubbod/discovery/cmd/dubbo-agent/app/cmd.go
b/dubbod/discovery/cmd/dubbo-agent/app/cmd.go
index 61c328dd..e1e2bff8 100644
--- a/dubbod/discovery/cmd/dubbo-agent/app/cmd.go
+++ b/dubbod/discovery/cmd/dubbo-agent/app/cmd.go
@@ -20,9 +20,8 @@ import (
"context"
"errors"
"fmt"
- "net/netip"
-
"github.com/apache/dubbo-kubernetes/pkg/log"
+ "net/netip"
"github.com/apache/dubbo-kubernetes/dubbod/discovery/cmd/dubbo-agent/options"
"github.com/apache/dubbo-kubernetes/dubbod/discovery/pkg/util/network"
@@ -65,7 +64,7 @@ func NewRootCommand(sds dubboagent.SDSServiceFactory)
*cobra.Command {
func newProxyCommand(sds dubboagent.SDSServiceFactory) *cobra.Command {
return &cobra.Command{
Use: "proxy",
- Short: "xDS proxy agent",
+ Short: "XDS proxy agent",
FParseErrWhitelist: cobra.FParseErrWhitelist{
UnknownFlags: true,
},
@@ -99,6 +98,7 @@ func newProxyCommand(sds dubboagent.SDSServiceFactory)
*cobra.Command {
agentOptions := options.NewAgentOptions(&proxyArgs,
proxyConfig, sds)
agent := dubboagent.NewAgent(proxyConfig, agentOptions,
secOpts)
+
ctx, cancel :=
context.WithCancelCause(context.Background())
defer cancel(errors.New("application shutdown"))
defer agent.Close()
@@ -110,9 +110,7 @@ func newProxyCommand(sds dubboagent.SDSServiceFactory)
*cobra.Command {
if err != nil {
return err
}
-
wait()
-
return nil
},
}
diff --git a/dubbod/discovery/docker/dockerfile.proxy
b/dubbod/discovery/docker/dockerfile.proxy
index 5cde2f66..bab30727 100644
--- a/dubbod/discovery/docker/dockerfile.proxy
+++ b/dubbod/discovery/docker/dockerfile.proxy
@@ -15,6 +15,6 @@
FROM gcr.io/distroless/static:debug
COPY bin/dubbo-agent /usr/local/bin/dubbo-agent
-COPY bin/pixiugateway /usr/local/bin/pixiugateway
+COPY bin/pixiu-gateway /usr/local/bin/pixiu-gateway
USER 9999:9999
ENTRYPOINT ["/usr/local/bin/dubbo-agent"]
diff --git a/dubbod/discovery/pkg/config/kube/crdclient/client.go
b/dubbod/discovery/pkg/config/kube/crdclient/client.go
index 9904a4a6..6e6f7cb7 100644
--- a/dubbod/discovery/pkg/config/kube/crdclient/client.go
+++ b/dubbod/discovery/pkg/config/kube/crdclient/client.go
@@ -156,7 +156,7 @@ func (cl *Client) addCRD(name string, opts
krt.OptionsBuilder) {
cl.logger.Debugf("addCRD: adding CRD %q", name)
s, f := cl.schemasByCRDName[name]
if !f {
- cl.logger.Debugf("Added resource that we are not watching: %v",
name)
+ cl.logger.Debugf("added resource that we are not watching: %v",
name)
return
}
resourceGVK := s.GroupVersionKind()
@@ -201,23 +201,18 @@ func (cl *Client) addCRD(name string, opts
krt.OptionsBuilder) {
ObjectTransform: transform,
FieldSelector: fieldSelector,
}
- cl.logger.Debugf("created filter for %v (namespaceFilter=%v,
extraFilter=%v, fieldSelector=%v)", resourceGVK, namespaceFilter != nil,
extraFilter != nil, fieldSelector)
+ if resourceGVK == gvk.KubernetesGateway {
+ filter.ObjectFilter = kubetypes.ComposeFilters(namespaceFilter,
extraFilter)
+ }
var kc kclient.Untyped
if s.IsBuiltin() {
kc = kclient.NewUntypedInformer(cl.client, gvr, filter)
} else {
- // For DestinationRule and VirtualService, we use Dynamic
client which returns unstructured objects
- // So we need to use DynamicInformer type to ensure the
informer expects unstructured objects
- informerType := kubetypes.StandardInformer
- if resourceGVK == gvk.DestinationRule || resourceGVK ==
gvk.VirtualService || resourceGVK == gvk.PeerAuthentication {
- informerType = kubetypes.DynamicInformer
- cl.logger.Debugf("using DynamicInformer for %v (uses
Dynamic client)", resourceGVK)
- }
kc = kclient.NewDelayedInformer[controllers.Object](
cl.client,
gvr,
- informerType,
+ kubetypes.StandardInformer,
filter,
)
}
@@ -226,58 +221,18 @@ func (cl *Client) addCRD(name string, opts
krt.OptionsBuilder) {
collection := krt.MapCollection(wrappedClient, func(obj
controllers.Object) config.Config {
cfg := translateFunc(obj)
cfg.Domain = cl.domainSuffix
- // Only log at Debug level to avoid spam, but keep it available
for diagnosis
- cl.logger.Debugf("MapCollection translating object %s/%s to
config for %v", obj.GetNamespace(), obj.GetName(), resourceGVK)
return cfg
}, opts.WithName("collection/"+resourceGVK.Kind)...)
index := krt.NewNamespaceIndex(collection)
- // Register a debug handler to track all events from the wrappedClient
(before MapCollection)
- // This helps diagnose if events are being filtered before reaching the
collection
- wrappedClientDebugHandler := wrappedClient.RegisterBatch(func(o
[]krt.Event[controllers.Object]) {
- if len(o) > 0 {
- cl.logger.Debugf("wrappedClient event detected for %v:
%d events", resourceGVK, len(o))
- for i, event := range o {
- var nameStr, nsStr string
- if event.New != nil {
- obj := *event.New
- nameStr = obj.GetName()
- nsStr = obj.GetNamespace()
- } else if event.Old != nil {
- obj := *event.Old
- nameStr = obj.GetName()
- nsStr = obj.GetNamespace()
- }
- cl.logger.Debugf("wrappedClient event[%d] %s
for %v (name=%s/%s)",
- i, event.Event, resourceGVK, nsStr,
nameStr)
- }
- }
- }, false)
- // Register a debug handler to track all events from the collection
- // This helps diagnose why new config changes might not trigger events
- // Use false to match Dubbo's implementation - only process future
events, not initial sync
- debugHandler := collection.RegisterBatch(func(o
[]krt.Event[config.Config]) {
- if len(o) > 0 {
- cl.logger.Debugf("collection event detected for %v: %d
events", resourceGVK, len(o))
- for i, event := range o {
- var nameStr, nsStr string
- if event.New != nil {
- nameStr = event.New.Name
- nsStr = event.New.Namespace
- } else if event.Old != nil {
- nameStr = event.Old.Name
- nsStr = event.Old.Namespace
- }
- cl.logger.Debugf("collection event[%d] %s for
%v (name=%s/%s)",
- i, event.Event, resourceGVK, nsStr,
nameStr)
- }
- }
- }, false)
cl.kinds[resourceGVK] = nsStore{
collection: collection,
index: index,
handlers: []krt.HandlerRegistration{
- wrappedClientDebugHandler,
- debugHandler,
+ collection.RegisterBatch(func(o
[]krt.Event[config.Config]) {
+ // for _, event := range o {
+ // incrementEvent(resourceGVK.Kind,
event.Event.String())
+ // }
+ }, false),
},
}
}
@@ -294,55 +249,23 @@ func (cl *Client) Schemas() collection.Schemas {
}
func (cl *Client) RegisterEventHandler(kind config.GroupVersionKind, handler
model.EventHandler) {
- cl.kindsMu.Lock()
- defer cl.kindsMu.Unlock()
-
- c, ok := cl.kinds[kind]
- if !ok {
- cl.logger.Warnf("unknown type: %s", kind)
- return
- }
-
- cl.logger.Debugf("Registering handler for %v", kind)
- handlerReg := c.collection.RegisterBatch(func(o
[]krt.Event[config.Config]) {
- cl.logger.Debugf("batch handler triggered for %v with %d
events", kind, len(o))
- for i, event := range o {
- var nameStr, nsStr string
- if event.New != nil {
- nameStr = event.New.Name
- nsStr = event.New.Namespace
- } else if event.Old != nil {
- nameStr = event.Old.Name
- nsStr = event.Old.Namespace
- }
- cl.logger.Debugf("processing event[%d] %s for %v
(name=%s/%s)",
- i, event.Event, kind, nsStr, nameStr)
- switch event.Event {
- case controllers.EventAdd:
- if event.New != nil {
+ if c, ok := cl.kind(kind); ok {
+ c.handlers = append(c.handlers,
c.collection.RegisterBatch(func(o []krt.Event[config.Config]) {
+ for _, event := range o {
+ switch event.Event {
+ case controllers.EventAdd:
handler(config.Config{}, *event.New,
model.Event(event.Event))
- } else {
- cl.logger.Warnf("EventAdd but event.New
is nil, skipping")
- }
- case controllers.EventUpdate:
- if event.Old != nil && event.New != nil {
+ case controllers.EventUpdate:
handler(*event.Old, *event.New,
model.Event(event.Event))
- } else {
- cl.logger.Warnf("EventUpdate but
event.Old or event.New is nil, skipping")
- }
- case controllers.EventDelete:
- if event.Old != nil {
+ case controllers.EventDelete:
handler(config.Config{}, *event.Old,
model.Event(event.Event))
- } else {
- cl.logger.Warnf("EventDelete but
event.Old is nil, skipping")
}
}
- }
- }, false)
- // Update handlers slice to keep reference (though not strictly
necessary for functionality)
- c.handlers = append(c.handlers, handlerReg)
- cl.kinds[kind] = c
- cl.logger.Debugf("Successfully registered handler for %v", kind)
+ }, false))
+ return
+ }
+
+ cl.logger.Warnf("unknown type: %s", kind)
}
func (cl *Client) Get(typ config.GroupVersionKind, name, namespace string)
*config.Config {
@@ -421,43 +344,14 @@ func (cl *Client) Delete(typ config.GroupVersionKind,
name, namespace string, re
func (cl *Client) List(kind config.GroupVersionKind, namespace string)
[]config.Config {
h, f := cl.kind(kind)
if !f {
- cl.logger.Warnf("unknown kind %v", kind)
return nil
}
- // Check if collection is synced
- if !h.collection.HasSynced() {
- cl.logger.Warnf("collection for %v is not synced yet", kind)
- }
-
- var configs []config.Config
if namespace == metav1.NamespaceAll {
- // Get all configs from collection
- configs = h.collection.List()
- cl.logger.Debugf("found %d configs for %v (namespace=all,
synced=%v)",
- len(configs), kind, h.collection.HasSynced())
- if len(configs) > 0 {
- for i, cfg := range configs {
- cl.logger.Debugf("config[%d] %s/%s for %v", i,
cfg.Namespace, cfg.Name, kind)
- }
- } else {
- cl.logger.Debugf("collection returned 0 configs for %v
(synced=%v), this may indicate informer is not watching correctly or resources
are being filtered", kind, h.collection.HasSynced())
- }
- // Log collection type for diagnosis
- cl.logger.Debugf("collection type is %T, HasSynced=%v",
h.collection, h.collection.HasSynced())
- } else {
- configs = h.index.Lookup(namespace)
- cl.logger.Debugf("found %d configs for %v in namespace %s
(synced=%v)", len(configs), kind, namespace, h.collection.HasSynced())
- if len(configs) > 0 {
- for i, cfg := range configs {
- cl.logger.Debugf("config[%d] %s/%s for %v", i,
cfg.Namespace, cfg.Name, kind)
- }
- } else {
- cl.logger.Debugf("found 0 configs for %v in namespace
%s (synced=%v), checking if resources exist in cluster", kind, namespace,
h.collection.HasSynced())
- }
+ return h.collection.List()
}
- return configs
+ return h.index.Lookup(namespace)
}
func getObjectMetadata(config config.Config) metav1.ObjectMeta {
diff --git a/go.mod b/go.mod
index d77fed3e..44a22eab 100644
--- a/go.mod
+++ b/go.mod
@@ -58,13 +58,13 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/heroku/color v0.0.6
github.com/howardjohn/celpp v0.1.0
- github.com/miekg/dns v1.1.68
github.com/moby/term v0.5.2
github.com/ory/viper v1.7.5
github.com/pkg/errors v0.9.1
github.com/sashabaranov/go-openai v1.40.5
github.com/spf13/cobra v1.9.1
github.com/spf13/pflag v1.0.7
+ github.com/stoewer/go-strcase v1.3.0
github.com/tmc/langchaingo v0.1.13
go.uber.org/atomic v1.11.0
golang.org/x/crypto v0.41.0
@@ -241,7 +241,6 @@ require (
github.com/spf13/afero v1.14.0 // indirect
github.com/spf13/cast v1.8.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
- github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/ulikunitz/xz v0.5.12 // indirect
github.com/vbatts/tar-split v0.12.1 // indirect
diff --git a/go.sum b/go.sum
index 46c095d4..4b60af9f 100644
--- a/go.sum
+++ b/go.sum
@@ -484,8 +484,6 @@ github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b
h1:j7+1HpAFS1zy5+Q4qx1f
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod
h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/microcosm-cc/bluemonday v1.0.26
h1:xbqSvqzQMeEHCqMi64VAs4d8uy6Mequs3rQ0k/Khz58=
github.com/microcosm-cc/bluemonday v1.0.26/go.mod
h1:JyzOCs9gkyQyjs+6h10UEVSe02CGwkhd72Xdqh78TWs=
-github.com/miekg/dns v1.1.68 h1:jsSRkNozw7G/mnmXULynzMNIsgY2dHC8LO6U6Ij2JEA=
-github.com/miekg/dns v1.1.68/go.mod
h1:fujopn7TB3Pu3JM69XaawiU0wqjpL9/8xGop5UrTPps=
github.com/mitchellh/copystructure v1.2.0
h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod
h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
github.com/mitchellh/go-homedir v1.1.0
h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
diff --git
a/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
b/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
index ced62f84..744dee17 100644
--- a/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
+++ b/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
@@ -38,8 +38,7 @@ spec:
postStart:
exec:
command:
-# - dubbo-agent
- - planet-agent
+ - dubbo-agent
- wait
- --url=http://localhost:15020/healthz/ready
readinessProbe:
diff --git a/manifests/charts/dubbo-control/dubbo-discovery/files/gateway.yaml
b/manifests/charts/dubbo-control/dubbo-discovery/files/kube-gateway.yaml
similarity index 95%
rename from manifests/charts/dubbo-control/dubbo-discovery/files/gateway.yaml
rename to manifests/charts/dubbo-control/dubbo-discovery/files/kube-gateway.yaml
index 66896c7a..49c29982 100644
--- a/manifests/charts/dubbo-control/dubbo-discovery/files/gateway.yaml
+++ b/manifests/charts/dubbo-control/dubbo-discovery/files/kube-gateway.yaml
@@ -46,7 +46,7 @@ spec:
serviceAccountName: {{ .ServiceAccount }}
containers:
- name: dubbo-proxy
- image: mfordjody/proxyadapter:0.3.2-debug
+ image: mfordjody/dubbo-proxy:0.3.5
imagePullPolicy: Always
ports:
- containerPort: 15020
@@ -144,6 +144,11 @@ kind: Service
metadata:
name: {{ .DeploymentName }}
namespace: {{ .Namespace }}
+ ownerReferences:
+ - apiVersion: gateway.networking.k8s.io/v1
+ kind: Gateway
+ name: {{.Name}}
+ uid: {{.UID}}
labels:
gateway.dubbo.apache.org/managed: {{ .ControllerLabel }}
gateway.networking.k8s.io/gateway-name: {{ .Name }}
diff --git
a/manifests/charts/dubbo-control/dubbo-discovery/templates/deployment.yaml
b/manifests/charts/dubbo-control/dubbo-discovery/templates/deployment.yaml
index a27407d7..21948b2f 100644
--- a/manifests/charts/dubbo-control/dubbo-discovery/templates/deployment.yaml
+++ b/manifests/charts/dubbo-control/dubbo-discovery/templates/deployment.yaml
@@ -19,7 +19,7 @@ spec:
serviceAccountName: dubbod
containers:
- name: discovery
- image: "mfordjody/planet:0.3.0-debug"
+ image: "mfordjody/dubbo-discovery:0.3.5"
imagePullPolicy: Always
args:
- "discovery"
diff --git
a/manifests/charts/dubbo-control/dubbo-discovery/templates/dubbd-injector-configmap.yaml
b/manifests/charts/dubbo-control/dubbo-discovery/templates/dubbd-injector-configmap.yaml
index 216a6858..c677b584 100644
---
a/manifests/charts/dubbo-control/dubbo-discovery/templates/dubbd-injector-configmap.yaml
+++
b/manifests/charts/dubbo-control/dubbo-discovery/templates/dubbd-injector-configmap.yaml
@@ -8,12 +8,12 @@ metadata:
dubbo.apache.org/rev: default
data:
config: |-
- defaultTemplates: [grpc-agent]
+ defaultTemplates: [kube-gateway]
policy: enabled
templates:
grpc-agent: |
{{ .Files.Get "files/grpc-agent.yaml" | trim | indent 8 }}
gateway: |
-{{ .Files.Get "files/gateway.yaml" | trim | indent 8 }}
+{{ .Files.Get "files/kube-gateway.yaml" | trim | indent 8 }}
values: |-
diff --git a/manifests/charts/dubbo-control/dubbo-discovery/values.yaml
b/manifests/charts/dubbo-control/dubbo-discovery/values.yaml
index 7ef132da..aee348aa 100644
--- a/manifests/charts/dubbo-control/dubbo-discovery/values.yaml
+++ b/manifests/charts/dubbo-control/dubbo-discovery/values.yaml
@@ -15,7 +15,7 @@
_internal_default_values_not_set:
hub: ""
- tag: 0.3.0
+ tag: 0.3.5
image: planet
resources:
diff --git a/pkg/config/model.go b/pkg/config/model.go
index a3391b35..c8d3a028 100644
--- a/pkg/config/model.go
+++ b/pkg/config/model.go
@@ -26,6 +26,8 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/slices"
+ "github.com/apache/dubbo-kubernetes/pkg/maps"
+ "github.com/apache/dubbo-kubernetes/pkg/util/gogoprotomarshal"
gogojsonpb "github.com/gogo/protobuf/jsonpb" // nolint: depguard
gogoproto "github.com/gogo/protobuf/proto" // nolint: depguard
"google.golang.org/protobuf/proto"
@@ -33,11 +35,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
kubetypes "k8s.io/apimachinery/pkg/types"
- "sigs.k8s.io/yaml"
-
- "github.com/apache/dubbo-kubernetes/pkg/maps"
- "github.com/apache/dubbo-kubernetes/pkg/util/gogoprotomarshal"
- "github.com/apache/dubbo-kubernetes/pkg/util/sets"
)
// Meta is metadata attached to each configuration unit.
@@ -125,19 +122,6 @@ func LabelsInRevision(lbls map[string]string, rev string)
bool {
return configEnv == rev
}
-func LabelsInRevisionOrTags(lbls map[string]string, rev string, tags
sets.Set[string]) bool {
- if LabelsInRevision(lbls, rev) {
- return true
- }
- configEnv := lbls["dubbo.apache.org/rev"]
- // Otherwise, only return true if revisions equal
- return tags.Contains(configEnv)
-}
-
-func ObjectInRevision(o *Config, rev string) bool {
- return LabelsInRevision(o.Labels, rev)
-}
-
// Spec defines the spec for the config. In order to use below helper methods,
// this must be one of:
// * golang/protobuf Message
@@ -145,40 +129,10 @@ func ObjectInRevision(o *Config, rev string) bool {
// * Able to marshal/unmarshal using json
type Spec any
-func ToMap(s Spec) (map[string]any, error) {
- js, err := ToJSON(s)
- if err != nil {
- return nil, err
- }
-
- // Unmarshal from json bytes to go map
- var data map[string]any
- err = json.Unmarshal(js, &data)
- if err != nil {
- return nil, err
- }
-
- return data, nil
-}
-
-func ToRaw(s Spec) (json.RawMessage, error) {
- js, err := ToJSON(s)
- if err != nil {
- return nil, err
- }
-
- // Unmarshal from json bytes to go map
- return js, nil
-}
-
func ToJSON(s Spec) ([]byte, error) {
return toJSON(s, false)
}
-func ToPrettyJSON(s Spec) ([]byte, error) {
- return toJSON(s, true)
-}
-
func toJSON(s Spec, pretty bool) ([]byte, error) {
indent := ""
if pretty {
@@ -201,26 +155,6 @@ type deepCopier interface {
DeepCopyInterface() any
}
-func ApplyYAML(s Spec, yml string) error {
- js, err := yaml.YAMLToJSON([]byte(yml))
- if err != nil {
- return err
- }
- return ApplyJSON(s, string(js))
-}
-
-func ApplyJSONStrict(s Spec, js string) error {
- // gogo protobuf
- if pb, ok := s.(gogoproto.Message); ok {
- err := gogoprotomarshal.ApplyJSONStrict(js, pb)
- return err
- }
-
- d := json.NewDecoder(bytes.NewReader([]byte(js)))
- d.DisallowUnknownFields()
- return d.Decode(&s)
-}
-
func ApplyJSON(s Spec, js string) error {
// gogo protobuf
if pb, ok := s.(gogoproto.Message); ok {
diff --git a/pkg/dubboagent/agent.go b/pkg/dubboagent/agent.go
index e1ea26f4..eccec379 100644
--- a/pkg/dubboagent/agent.go
+++ b/pkg/dubboagent/agent.go
@@ -40,8 +40,8 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/bootstrap"
"github.com/apache/dubbo-kubernetes/pkg/config/constants"
"github.com/apache/dubbo-kubernetes/pkg/dubboagent/grpcxds"
- "github.com/apache/dubbo-kubernetes/pkg/dubboagent/pixiu"
"github.com/apache/dubbo-kubernetes/pkg/filewatcher"
+ "github.com/apache/dubbo-kubernetes/pkg/pixiu"
"github.com/apache/dubbo-kubernetes/pkg/security"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"google.golang.org/grpc"
@@ -156,7 +156,6 @@ func (a *Agent) Run(ctx context.Context) (func(), error) {
var bootstrapNode *core.Node
if a.cfg.GRPCBootstrapPath != "" {
- log.Infof("Starting dubbo-agent with GRPC bootstrap path: %s",
a.cfg.GRPCBootstrapPath)
node, err := a.generateGRPCBootstrapWithNode()
if err != nil {
return nil, fmt.Errorf("failed generating gRPC XDS
bootstrap: %v", err)
@@ -181,6 +180,7 @@ func (a *Agent) Run(ctx context.Context) (func(), error) {
} else {
log.Warn("GRPC_XDS_BOOTSTRAP not set, bootstrap file will not
be generated")
}
+
if a.proxyConfig.ControlPlaneAuthPolicy !=
mesh.AuthenticationPolicy_NONE {
rootCAForXDS, err := a.FindRootCAForXDS()
if err != nil {
@@ -216,7 +216,6 @@ func (a *Agent) Run(ctx context.Context) (func(), error) {
a.wg.Add(1)
go func() {
defer a.wg.Done()
- log.Infof("Opening status port %d", a.proxyConfig.StatusPort)
if err := a.statusSrv.ListenAndServe(); err != nil && err !=
http.ErrServerClosed {
log.Errorf("status server error: %v", err)
}
@@ -233,11 +232,28 @@ func (a *Agent) Run(ctx context.Context) (func(), error) {
if err := a.initializePixiuAgent(ctx); err != nil {
return nil, fmt.Errorf("failed to initialize Pixiu
agent: %v", err)
}
+
+ a.wg.Add(1)
+ go func() {
+ defer a.wg.Done()
+ a.pixiuAgent.Run(ctx)
+ }()
+ } else if a.WaitForSigterm() {
+ // wait for SIGTERM and perform graceful shutdown
+ a.wg.Add(1)
+ go func() {
+ defer a.wg.Done()
+ <-ctx.Done()
+ }()
}
return a.wg.Wait, nil
}
+func (a *Agent) WaitForSigterm() bool {
+ return a.cfg.ProxyType == model.Router
+}
+
func (a *Agent) Close() {
if a.xdsProxy != nil {
a.xdsProxy.close()
@@ -477,7 +493,7 @@ func (a *Agent) generateGRPCBootstrapWithNode()
(*model.Node, error) {
if err != nil {
return nil, fmt.Errorf("failed to resolve absolute path for
bootstrap file: %v", err)
}
- log.Infof("Generating gRPC bootstrap file at: %s (absolute: %s)",
bootstrapPath, absBootstrapPath)
+ log.Debugf("Generating gRPC bootstrap file at: %s (absolute: %s)",
bootstrapPath, absBootstrapPath)
// generate metadata
node, err := a.generateNodeMetadata()
@@ -485,6 +501,8 @@ func (a *Agent) generateGRPCBootstrapWithNode()
(*model.Node, error) {
return nil, fmt.Errorf("failed generating node metadata: %v",
err)
}
+ log.Infof("Dubbo SAN: %v", node.Metadata.DubboSubjectAltName)
+
// GRPC bootstrap requires this. Original implementation injected this
via env variable, but
// this interfere with envoy, we should be able to use both envoy for
TCP/HTTP and proxyless.
node.Metadata.Generator = "grpc"
@@ -505,7 +523,7 @@ func (a *Agent) generateGRPCBootstrapWithNode()
(*model.Node, error) {
if err != nil {
return nil, err
}
- log.Infof("gRPC bootstrap file generated successfully at: %s",
absBootstrapPath)
+ log.Debugf("gRPC bootstrap file generated successfully at: %s",
absBootstrapPath)
return node, nil
}
@@ -629,8 +647,7 @@ func checkSocket(ctx context.Context, socketPath string)
(bool, error) {
return true, nil
}
-// initializePixiuAgent initializes and starts Pixiu agent for router mode
-func (a *Agent) initializePixiuAgent(ctx context.Context) error {
+func (a *Agent) initializePixiuAgent(_ context.Context) error {
configPath := os.Getenv("PROXY_CONFIG_PATH")
if configPath == "" {
configPath = "/etc/pixiu/config/pixiu.yaml"
@@ -639,21 +656,19 @@ func (a *Agent) initializePixiuAgent(ctx context.Context)
error {
binaryPath := os.Getenv("PROXY_BINARY_PATH")
if binaryPath == "" {
possiblePaths := []string{
- "/usr/local/bin/pixiugateway",
- "pixiugateway",
+ "/usr/local/bin/pixiu-gateway",
+ "pixiu-gateway",
}
found := false
for _, path := range possiblePaths {
if strings.Contains(path, "/") {
- // Check absolute path
if _, err := os.Stat(path); err == nil {
binaryPath = path
found = true
break
}
} else {
- // Check in PATH
if fullPath, err := exec.LookPath(path); err ==
nil {
binaryPath = fullPath
found = true
@@ -663,7 +678,7 @@ func (a *Agent) initializePixiuAgent(ctx context.Context)
error {
}
if !found {
- return fmt.Errorf("gateway binary pixiugateway not
found. Please set PROXY_BINARY_PATH environment variable or ensure pixiu binary
is installed at /usr/local/bin/pixiugateway")
+ return fmt.Errorf("gateway binary pixiu not found.
Please set PROXY_BINARY_PATH environment variable or ensure pixiu binary is
installed at /usr/local/bin/pixiu-gateway")
}
}
@@ -675,11 +690,11 @@ func (a *Agent) initializePixiuAgent(ctx context.Context)
error {
return fmt.Errorf("failed to create proxy config directory:
%v", err)
}
- pixiuProxy := pixiu.NewProxy(pixiu.ProxyConfig{
+ pixiuOptions := pixiu.ProxyConfig{
ConfigPath: configPath,
ConfigCleanup: false, // Don't cleanup config file
BinaryPath: binaryPath,
- })
+ }
terminationDrainDuration := 45 * time.Second // Default drain duration
if a.proxyConfig.DrainDuration != nil {
@@ -687,14 +702,8 @@ func (a *Agent) initializePixiuAgent(ctx context.Context)
error {
}
minDrainDuration := 5 * time.Second
+ pixiuProxy := pixiu.NewProxy(pixiuOptions)
a.pixiuAgent = pixiu.NewAgent(pixiuProxy, terminationDrainDuration,
minDrainDuration)
- a.wg.Add(1)
- go func() {
- defer a.wg.Done()
- a.pixiuAgent.Run(ctx)
- }()
-
- log.Infof("pixiu agent initialized for router mode, config path: %s,
binary: %s", configPath, binaryPath)
return nil
}
diff --git a/pkg/dubboagent/xds_proxy.go b/pkg/dubboagent/xds_proxy.go
index f7e2ff30..2346b0e0 100644
--- a/pkg/dubboagent/xds_proxy.go
+++ b/pkg/dubboagent/xds_proxy.go
@@ -32,9 +32,9 @@ import (
dubbogrpc "github.com/apache/dubbo-kubernetes/dubbod/discovery/pkg/grpc"
"github.com/apache/dubbo-kubernetes/dubbod/security/pkg/nodeagent/caclient"
"github.com/apache/dubbo-kubernetes/pkg/channels"
- "github.com/apache/dubbo-kubernetes/pkg/dubboagent/pixiu"
dubbokeepalive "github.com/apache/dubbo-kubernetes/pkg/keepalive"
"github.com/apache/dubbo-kubernetes/pkg/model"
+ "github.com/apache/dubbo-kubernetes/pkg/pixiu"
"github.com/apache/dubbo-kubernetes/pkg/uds"
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
@@ -114,9 +114,9 @@ type XdsProxy struct {
initialHealthRequest *discovery.DiscoveryRequest
initialDeltaHealthRequest *discovery.DeltaDiscoveryRequest
// Upstream connection for proxyless mode
- bootstrapNode *core.Node
- proxylessConnMutex sync.RWMutex
- proxylessConn *ProxyConnection
+ bootstrapNode *core.Node
+ ConnMutex sync.RWMutex
+ Conn *ProxyConnection
pixiuConverter *pixiu.ConfigConverter
pixiuConfigPath string
@@ -181,9 +181,9 @@ func initXdsProxy(ia *Agent) (*XdsProxy, error) {
// Wait for bootstrap Node to be set (with timeout)
// The Node is set synchronously after bootstrap file
generation in agent.Run()
for i := 0; i < 50; i++ {
- proxy.proxylessConnMutex.RLock()
+ proxy.ConnMutex.RLock()
nodeReady := proxy.bootstrapNode != nil
- proxy.proxylessConnMutex.RUnlock()
+ proxy.ConnMutex.RUnlock()
if nodeReady {
break
}
@@ -193,9 +193,9 @@ func initXdsProxy(ia *Agent) (*XdsProxy, error) {
case <-time.After(100 * time.Millisecond):
}
}
- proxy.proxylessConnMutex.RLock()
+ proxy.ConnMutex.RLock()
nodeReady := proxy.bootstrapNode != nil
- proxy.proxylessConnMutex.RUnlock()
+ proxy.ConnMutex.RUnlock()
if !nodeReady {
proxyLog.Warnf("Bootstrap Node not set after 5 seconds,
proceeding anyway")
}
@@ -211,10 +211,10 @@ func initXdsProxy(ia *Agent) (*XdsProxy, error) {
}
// Establish connection
- connDone, err := proxy.establishProxylessConnection(ia)
+ connDone, err := proxy.establishConnection(ia)
if err != nil {
// Connection failed, log and retry with
exponential backoff
- proxyLog.Warnf("Failed to establish proxyless
upstream connection: %v, retrying in %v", err, backoff)
+ proxyLog.Warnf("Failed to establish upstream
connection: %v, retrying in %v", err, backoff)
select {
case <-proxy.stopChan:
@@ -230,13 +230,13 @@ func initXdsProxy(ia *Agent) (*XdsProxy, error) {
// Connection successful, reset backoff
backoff = time.Second
- proxyLog.Infof("Proxyless upstream connection connected
successfully")
+ proxyLog.Infof("Upstream Connected Successfully")
// Wait for connection to terminate (connDone will be
closed when connection dies)
select {
case <-proxy.stopChan:
return
case <-connDone:
- proxyLog.Warnf("Proxyless connection
terminated, will retry")
+ proxyLog.Warnf("connection terminated, will
retry")
}
}
}()
@@ -294,7 +294,7 @@ func (p *XdsProxy) handleUpstream(ctx context.Context, con
*ProxyConnection, xds
proxyLog.Errorf("connection #%d failed to create stream to
upstream %s: %v", con.conID, p.dubbodAddress, err)
return err
}
- proxyLog.Infof("connected to upstream XDS server: %s id=%d",
p.dubbodAddress, con.conID)
+ proxyLog.Infof("Connected to upstream XDS server: %s id=%d",
p.dubbodAddress, con.conID)
// Log when we start receiving responses
go func() {
@@ -712,26 +712,30 @@ func (p *XdsProxy) getTLSOptions(agent *Agent)
(*dubbogrpc.TLSOptions, error) {
}
func (p *XdsProxy) SetBootstrapNode(node *core.Node) {
- p.proxylessConnMutex.Lock()
+ p.ConnMutex.Lock()
p.bootstrapNode = node
- p.proxylessConnMutex.Unlock()
+ p.ConnMutex.Unlock()
}
-func (p *XdsProxy) establishProxylessConnection(ia *Agent) (<-chan struct{},
error) {
- p.proxylessConnMutex.Lock()
+func (p *XdsProxy) establishConnection(ia *Agent) (<-chan struct{}, error) {
+ p.ConnMutex.Lock()
node := p.bootstrapNode
// Clean up old connection if it exists
- if p.proxylessConn != nil {
- close(p.proxylessConn.stopChan)
- p.proxylessConn = nil
+ if p.Conn != nil {
+ close(p.Conn.stopChan)
+ p.Conn = nil
}
- p.proxylessConnMutex.Unlock()
+ p.ConnMutex.Unlock()
if node == nil {
return nil, fmt.Errorf("bootstrap node not available")
}
- proxyLog.Infof("Connecting proxyless upstream connection with Node:
%s", node.Id)
+ if ia.cfg.ProxyType == model.Proxyless {
+ proxyLog.Infof("Connecting proxyless upstream connection with
Node: %s", node.Id)
+ } else if ia.cfg.ProxyType == model.Router {
+ proxyLog.Infof("Connecting router upstream connection with
Node: %s", node.Id)
+ }
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
@@ -757,7 +761,7 @@ func (p *XdsProxy) establishProxylessConnection(ia *Agent)
(<-chan struct{}, err
_ = upstreamConn.Close()
return nil, fmt.Errorf("failed to create upstream stream: %w",
err)
}
- proxyLog.Infof("connected to upstream XDS server (proxyless): %s",
p.dubbodAddress)
+ proxyLog.Infof("Connected to upstream XDS server: %s", p.dubbodAddress)
conID := connectionNumber.Inc()
con := &ProxyConnection{
@@ -770,9 +774,9 @@ func (p *XdsProxy) establishProxylessConnection(ia *Agent)
(<-chan struct{}, err
node: node,
}
- p.proxylessConnMutex.Lock()
- p.proxylessConn = con
- p.proxylessConnMutex.Unlock()
+ p.ConnMutex.Lock()
+ p.Conn = con
+ p.ConnMutex.Unlock()
// Close upstream connection when connection terminates and signal done
channel
go func() {
@@ -782,11 +786,11 @@ func (p *XdsProxy) establishProxylessConnection(ia
*Agent) (<-chan struct{}, err
case <-p.stopChan:
}
_ = upstreamConn.Close()
- p.proxylessConnMutex.Lock()
- if p.proxylessConn == con {
- p.proxylessConn = nil
+ p.ConnMutex.Lock()
+ if p.Conn == con {
+ p.Conn = nil
}
- p.proxylessConnMutex.Unlock()
+ p.ConnMutex.Unlock()
close(connDone)
}()
@@ -795,14 +799,14 @@ func (p *XdsProxy) establishProxylessConnection(ia
*Agent) (<-chan struct{}, err
TypeUrl: model.ListenerType,
Node: node,
}
- proxyLog.Infof("proxyless connection sending initial LDS request with
Node: %s", node.Id)
+ proxyLog.Debugf("connection sending initial LDS request with Node: %s",
node.Id)
if err := upstream.Send(ldsReq); err != nil {
_ = upstreamConn.Close()
- p.proxylessConnMutex.Lock()
- if p.proxylessConn == con {
- p.proxylessConn = nil
+ p.ConnMutex.Lock()
+ if p.Conn == con {
+ p.Conn = nil
}
- p.proxylessConnMutex.Unlock()
+ p.ConnMutex.Unlock()
close(connDone)
return nil, fmt.Errorf("failed to send initial LDS request:
%w", err)
}
@@ -830,7 +834,7 @@ func (p *XdsProxy) establishProxylessConnection(ia *Agent)
(<-chan struct{}, err
upstreamErr(con, err)
return
}
- proxyLog.Debugf("proxyless connection received
response: TypeUrl=%s, Resources=%d",
+ proxyLog.Debugf("connection received response:
TypeUrl=%s, Resources=%d",
model.GetShortType(resp.TypeUrl),
len(resp.Resources))
// Send ACK
ackReq := &discovery.DiscoveryRequest{
@@ -876,58 +880,6 @@ func (p *XdsProxy) establishProxylessConnection(ia *Agent)
(<-chan struct{}, err
return connDone, nil
}
-func (p *XdsProxy) close() {
- close(p.stopChan)
- p.proxylessConnMutex.Lock()
- if p.proxylessConn != nil {
- close(p.proxylessConn.stopChan)
- p.proxylessConn = nil
- }
- p.proxylessConnMutex.Unlock()
- if p.downstreamGrpcServer != nil {
- p.downstreamGrpcServer.Stop()
- }
- if p.downstreamListener != nil {
- _ = p.downstreamListener.Close()
- }
-}
-
-func (con *ProxyConnection) sendRequest(req *discovery.DiscoveryRequest) {
- con.requestsChan.Put(req)
-}
-
-func upstreamErr(con *ProxyConnection, err error) {
- switch dubbogrpc.GRPCErrorType(err) {
- case dubbogrpc.GracefulTermination:
- err = nil
- fallthrough
- case dubbogrpc.ExpectedError:
- proxyLog.Errorf("upstream terminated with status %v", err)
- default:
- proxyLog.Errorf("upstream terminated with unexpected error %v",
err)
- }
- select {
- case con.upstreamError <- err:
- case <-con.stopChan:
- }
-}
-
-func downstreamErr(con *ProxyConnection, err error) {
- switch dubbogrpc.GRPCErrorType(err) {
- case dubbogrpc.GracefulTermination:
- err = nil
- fallthrough
- case dubbogrpc.ExpectedError:
- proxyLog.Errorf("downstream terminated with status %v", err)
- default:
- proxyLog.Errorf("downstream terminated with unexpected error
%v", err)
- }
- select {
- case con.downstreamError <- err:
- case <-con.stopChan:
- }
-}
-
// updatePixiuConfig updates Pixiu configuration from xDS responses
func (p *XdsProxy) updatePixiuConfig(resp *discovery.DiscoveryResponse) {
if p.pixiuConverter == nil {
@@ -1002,3 +954,55 @@ func (p *XdsProxy) updatePixiuConfig(resp
*discovery.DiscoveryResponse) {
proxyLog.Infof("Updated Pixiu config at %s", p.pixiuConfigPath)
}
+
+func (p *XdsProxy) close() {
+ close(p.stopChan)
+ p.ConnMutex.Lock()
+ if p.Conn != nil {
+ close(p.Conn.stopChan)
+ p.Conn = nil
+ }
+ p.ConnMutex.Unlock()
+ if p.downstreamGrpcServer != nil {
+ p.downstreamGrpcServer.Stop()
+ }
+ if p.downstreamListener != nil {
+ _ = p.downstreamListener.Close()
+ }
+}
+
+func (con *ProxyConnection) sendRequest(req *discovery.DiscoveryRequest) {
+ con.requestsChan.Put(req)
+}
+
+func upstreamErr(con *ProxyConnection, err error) {
+ switch dubbogrpc.GRPCErrorType(err) {
+ case dubbogrpc.GracefulTermination:
+ err = nil
+ fallthrough
+ case dubbogrpc.ExpectedError:
+ proxyLog.Errorf("upstream terminated with status %v", err)
+ default:
+ proxyLog.Errorf("upstream terminated with unexpected error %v",
err)
+ }
+ select {
+ case con.upstreamError <- err:
+ case <-con.stopChan:
+ }
+}
+
+func downstreamErr(con *ProxyConnection, err error) {
+ switch dubbogrpc.GRPCErrorType(err) {
+ case dubbogrpc.GracefulTermination:
+ err = nil
+ fallthrough
+ case dubbogrpc.ExpectedError:
+ proxyLog.Errorf("downstream terminated with status %v", err)
+ default:
+ proxyLog.Errorf("downstream terminated with unexpected error
%v", err)
+ }
+ select {
+ case con.downstreamError <- err:
+ case <-con.stopChan:
+ }
+}
diff --git a/pkg/kube/inject/inject.go b/pkg/kube/inject/inject.go
index 4164b3e9..a0c6845e 100644
--- a/pkg/kube/inject/inject.go
+++ b/pkg/kube/inject/inject.go
@@ -114,7 +114,7 @@ func RunTemplate(params InjectionParameters) (mergedPod
*corev1.Pod, templatePod
MeshGlobalConfig: meshGlobalConfig,
Values: params.valuesConfig.asMap,
Revision: params.revision,
- ProxyImage: getProxyImage(params.valuesConfig.asMap,
"mfordjody/proxyadapter:0.3.2-debug"),
+ ProxyImage: getProxyImage(params.valuesConfig.asMap,
"mfordjody/dubbo-proxy:0.3.5"),
CompliancePolicy: common_features.CompliancePolicy,
}
diff --git a/pkg/kube/inject/webhook.go b/pkg/kube/inject/webhook.go
index bfab0bc3..853c61a3 100644
--- a/pkg/kube/inject/webhook.go
+++ b/pkg/kube/inject/webhook.go
@@ -283,7 +283,7 @@ func (wh *Webhook) inject(ar *kube.AdmissionReview, path
string) *kube.Admission
return toAdmissionResponse(err)
}
- log.Infof("Injection successful, patch size: %d bytes", len(patchBytes))
+ log.Infof("Injection Successfully, patch size: %d bytes",
len(patchBytes))
reviewResponse := kube.AdmissionResponse{
Allowed: true,
Patch: patchBytes,
diff --git a/pkg/dubboagent/pixiu/agent.go b/pkg/pixiu/agent.go
similarity index 63%
rename from pkg/dubboagent/pixiu/agent.go
rename to pkg/pixiu/agent.go
index 67008438..27740d5a 100644
--- a/pkg/dubboagent/pixiu/agent.go
+++ b/pkg/pixiu/agent.go
@@ -20,8 +20,6 @@ import (
"context"
"errors"
"time"
-
- "go.uber.org/atomic"
)
var errAbort = errors.New("gateway aborted")
@@ -35,8 +33,6 @@ type Agent struct {
terminationDrainDuration time.Duration
minDrainDuration time.Duration
-
- skipDrain *atomic.Bool
}
type exitStatus struct {
@@ -51,72 +47,47 @@ func NewAgent(proxy Proxy, terminationDrainDuration,
minDrainDuration time.Durat
abortCh: make(chan error, 1),
terminationDrainDuration: terminationDrainDuration,
minDrainDuration: minDrainDuration,
- skipDrain: atomic.NewBool(false),
}
}
-// Run starts the Pixiu and waits until it terminates
func (a *Agent) Run(ctx context.Context) {
- log.Info("starting gateway proxy agent")
+ pixiulog.Info("Starting gateway agent")
go a.runWait(a.abortCh)
select {
case status := <-a.statusCh:
if status.err != nil {
- log.Errorf("gateway exited with error: %v", status.err)
+ pixiulog.Errorf("gateway exited with error: %v",
status.err)
} else {
- log.Infof("gateway exited normally")
+ pixiulog.Infof("gateway exited normally")
}
case <-ctx.Done():
a.terminate()
- log.Info("agent has successfully terminated")
- }
-}
-
-func (a *Agent) DisableDraining() {
- a.skipDrain.Store(true)
-}
-
-func (a *Agent) DrainNow() {
- log.Infof("agent draining Pixiu")
- err := a.proxy.Drain(true)
- if err != nil {
- log.Warnf("error in invoking drain: %v", err)
+ pixiulog.Info("agent has successfully terminated")
}
- a.DisableDraining()
}
// terminate starts exiting the process
func (a *Agent) terminate() {
- log.Infof("agent draining gateway proxy for termination")
- if a.skipDrain.Load() {
- log.Infof("agent already drained, exiting immediately")
- a.abortCh <- errAbort
- return
- }
- e := a.proxy.Drain(false)
- if e != nil {
- log.Warnf("error in invoking drain: %v", e)
- }
-
- log.Infof("graceful termination period is %v, starting...",
a.terminationDrainDuration)
+ pixiulog.Infof("agent draining gateway proxy for termination")
+ pixiulog.Infof("graceful termination period is %v, starting...",
a.terminationDrainDuration)
select {
case status := <-a.statusCh:
- log.Infof("pixiu exited with status %v", status.err)
- log.Infof("graceful termination logic ended prematurely,
gateway process terminated early")
+ pixiulog.Infof("pixiu exited with status %v", status.err)
+ pixiulog.Infof("graceful termination logic ended prematurely,
gateway process terminated early")
return
case <-time.After(a.terminationDrainDuration):
- log.Infof("graceful termination period complete, terminating
remaining processes.")
+ pixiulog.Infof("graceful termination period complete,
terminating remaining processes.")
a.abortCh <- errAbort
}
status := <-a.statusCh
if status.err == errAbort {
- log.Infof("gateway aborted normally")
+ pixiulog.Infof("gateway aborted normally")
} else {
- log.Warnf("gateway aborted abnormally")
+ pixiulog.Warnf("gateway aborted abnormally")
}
- log.Warnf("aborted gateway instance")
+ pixiulog.Warnf("aborted gateway instance")
}
// runWait runs the start-up command as a go routine and waits for it to finish
diff --git a/pkg/pixiu/config.go b/pkg/pixiu/config.go
new file mode 100644
index 00000000..0938c003
--- /dev/null
+++ b/pkg/pixiu/config.go
@@ -0,0 +1,64 @@
+package pixiu
+
+// PixiuBootstrap represents Pixiu Bootstrap configuration
+type PixiuBootstrap struct {
+ StaticResources StaticResources `yaml:"static_resources"
json:"static_resources"`
+}
+
+// StaticResources contains static resources
+type StaticResources struct {
+ Listeners []*Listener `yaml:"listeners" json:"listeners"`
+ Clusters []*Cluster `yaml:"clusters" json:"clusters"`
+}
+
+// Listener represents a Pixiu listener
+type Listener struct {
+ Name string `yaml:"name" json:"name"`
+ Address Address `yaml:"address" json:"address"`
+ ProtocolStr string `yaml:"protocol_type" json:"protocol_type"`
+ FilterChain FilterChain `yaml:"filter_chains" json:"filter_chains"`
+}
+
+// Address represents network address
+type Address struct {
+ SocketAddress SocketAddress `yaml:"socket_address"
json:"socket_address"`
+}
+
+// SocketAddress represents socket address
+type SocketAddress struct {
+ Address string `yaml:"address" json:"address"`
+ PortValue int `yaml:"port_value" json:"port_value"`
+}
+
+// FilterChain represents filter chain
+type FilterChain struct {
+ Filters []Filter `yaml:"filters" json:"filters"`
+}
+
+// Filter represents a filter
+type Filter struct {
+ Name string `yaml:"name" json:"name"`
+ Config map[string]interface{} `yaml:"config" json:"config"`
+}
+
+// Cluster represents a Pixiu cluster
+type Cluster struct {
+ Name string `yaml:"name" json:"name"`
+ Type string `yaml:"type" json:"type"`
+ LbPolicy string `yaml:"lb_policy" json:"lb_policy"`
+ Endpoints []Endpoint `yaml:"endpoints" json:"endpoints"`
+ HealthCheck *HealthCheck `yaml:"health_check,omitempty"
json:"health_check,omitempty"`
+}
+
+// Endpoint represents an endpoint
+type Endpoint struct {
+ Address Address `yaml:"address" json:"address"`
+}
+
+// HealthCheck represents health check configuration
+type HealthCheck struct {
+ Timeout string `yaml:"timeout" json:"timeout"`
+ Interval string `yaml:"interval" json:"interval"`
+ UnhealthyThreshold int `yaml:"unhealthy_threshold"
json:"unhealthy_threshold"`
+ HealthyThreshold int `yaml:"healthy_threshold"
json:"healthy_threshold"`
+}
diff --git a/pkg/dubboagent/pixiu/converter.go b/pkg/pixiu/converter.go
similarity index 69%
rename from pkg/dubboagent/pixiu/converter.go
rename to pkg/pixiu/converter.go
index 0a4e061b..f8ccb49f 100644
--- a/pkg/dubboagent/pixiu/converter.go
+++ b/pkg/pixiu/converter.go
@@ -67,9 +67,9 @@ func (c *ConfigConverter) UpdateEndpoint(name string, e
*endpoint.ClusterLoadAss
// ConvertToPixiuConfig converts xDS configuration to Pixiu YAML configuration
func (c *ConfigConverter) ConvertToPixiuConfig() ([]byte, error) {
pixiuConfig := &PixiuBootstrap{
- StaticResources: PixiuStaticResources{
- Listeners: []*PixiuListener{},
- Clusters: []*PixiuCluster{},
+ StaticResources: StaticResources{
+ Listeners: []*Listener{},
+ Clusters: []*Cluster{},
},
}
@@ -97,11 +97,11 @@ func (c *ConfigConverter) ConvertToPixiuConfig() ([]byte,
error) {
}
// convertListener converts Envoy listener to Pixiu listener
-func (c *ConfigConverter) convertListener(name string, l *listener.Listener)
*PixiuListener {
+func (c *ConfigConverter) convertListener(name string, l *listener.Listener)
*Listener {
// Only process listeners on port 80 (Gateway HTTP listener)
port := int(l.Address.GetSocketAddress().GetPortValue())
if port != 80 {
- log.Debugf("Skipping listener %s on port %d (not Gateway port
80)", name, port)
+ pixiulog.Debugf("Skipping listener %s on port %d (not Gateway
port 80)", name, port)
return nil
}
@@ -125,7 +125,7 @@ func (c *ConfigConverter) convertListener(name string, l
*listener.Listener) *Pi
}
if hcmFilter == nil {
- log.Debugf("Listener %s does not have HttpConnectionManager
filter", name)
+ pixiulog.Debugf("Listener %s does not have
HttpConnectionManager filter", name)
return nil
}
@@ -136,12 +136,12 @@ func (c *ConfigConverter) convertListener(name string, l
*listener.Listener) *Pi
routeConfigName = inlineRoute.Name
}
- pixiuListener := &PixiuListener{
+ pixiuListener := &Listener{
Name: name,
- Address: PixiuAddress{},
+ Address: Address{},
ProtocolStr: "http",
- FilterChain: PixiuFilterChain{
- Filters: []PixiuFilter{
+ FilterChain: FilterChain{
+ Filters: []Filter{
{
Name: "http",
Config: map[string]interface{}{
@@ -157,8 +157,8 @@ func (c *ConfigConverter) convertListener(name string, l
*listener.Listener) *Pi
// Set address
addr := l.Address.GetSocketAddress()
if addr != nil {
- pixiuListener.Address = PixiuAddress{
- SocketAddress: PixiuSocketAddress{
+ pixiuListener.Address = Address{
+ SocketAddress: SocketAddress{
Address: addr.Address,
PortValue: int(addr.GetPortValue()),
},
@@ -169,8 +169,8 @@ func (c *ConfigConverter) convertListener(name string, l
*listener.Listener) *Pi
}
// convertCluster converts Envoy cluster to Pixiu cluster
-func (c *ConfigConverter) convertCluster(name string, cl *cluster.Cluster)
*PixiuCluster {
- pixiuCluster := &PixiuCluster{
+func (c *ConfigConverter) convertCluster(name string, cl *cluster.Cluster)
*Cluster {
+ pixiuCluster := &Cluster{
Name: name,
Type: "EDS", // Pixiu supports EDS, STRICT_DNS, etc.
LbPolicy: "round_robin",
@@ -223,7 +223,7 @@ func (c *ConfigConverter) convertCluster(name string, cl
*cluster.Cluster) *Pixi
// Convert health check
if cl.HealthChecks != nil && len(cl.HealthChecks) > 0 {
hc := cl.HealthChecks[0]
- pixiuCluster.HealthCheck = &PixiuHealthCheck{
+ pixiuCluster.HealthCheck = &HealthCheck{
Timeout: hc.Timeout.AsDuration().String(),
Interval: hc.Interval.AsDuration().String(),
UnhealthyThreshold:
int(hc.UnhealthyThreshold.GetValue()),
@@ -235,15 +235,15 @@ func (c *ConfigConverter) convertCluster(name string, cl
*cluster.Cluster) *Pixi
}
// convertEndpoints converts Envoy endpoints to Pixiu endpoints
-func (c *ConfigConverter) convertEndpoints(assignment
*endpoint.ClusterLoadAssignment) []PixiuEndpoint {
- var endpoints []PixiuEndpoint
+func (c *ConfigConverter) convertEndpoints(assignment
*endpoint.ClusterLoadAssignment) []Endpoint {
+ var endpoints []Endpoint
for _, localityLbEndpoints := range assignment.Endpoints {
for _, lbEndpoint := range localityLbEndpoints.LbEndpoints {
if socketAddress :=
lbEndpoint.GetEndpoint().Address.GetSocketAddress(); socketAddress != nil {
- endpoints = append(endpoints, PixiuEndpoint{
- Address: PixiuAddress{
- SocketAddress:
PixiuSocketAddress{
+ endpoints = append(endpoints, Endpoint{
+ Address: Address{
+ SocketAddress: SocketAddress{
Address:
socketAddress.Address,
PortValue:
int(socketAddress.GetPortValue()),
},
@@ -255,66 +255,3 @@ func (c *ConfigConverter) convertEndpoints(assignment
*endpoint.ClusterLoadAssig
return endpoints
}
-
-// PixiuBootstrap represents Pixiu Bootstrap configuration
-type PixiuBootstrap struct {
- StaticResources PixiuStaticResources `yaml:"static_resources"
json:"static_resources"`
-}
-
-// PixiuStaticResources contains static resources
-type PixiuStaticResources struct {
- Listeners []*PixiuListener `yaml:"listeners" json:"listeners"`
- Clusters []*PixiuCluster `yaml:"clusters" json:"clusters"`
-}
-
-// PixiuListener represents a Pixiu listener
-type PixiuListener struct {
- Name string `yaml:"name" json:"name"`
- Address PixiuAddress `yaml:"address" json:"address"`
- ProtocolStr string `yaml:"protocol_type" json:"protocol_type"`
- FilterChain PixiuFilterChain `yaml:"filter_chains" json:"filter_chains"`
-}
-
-// PixiuAddress represents network address
-type PixiuAddress struct {
- SocketAddress PixiuSocketAddress `yaml:"socket_address"
json:"socket_address"`
-}
-
-// PixiuSocketAddress represents socket address
-type PixiuSocketAddress struct {
- Address string `yaml:"address" json:"address"`
- PortValue int `yaml:"port_value" json:"port_value"`
-}
-
-// PixiuFilterChain represents filter chain
-type PixiuFilterChain struct {
- Filters []PixiuFilter `yaml:"filters" json:"filters"`
-}
-
-// PixiuFilter represents a filter
-type PixiuFilter struct {
- Name string `yaml:"name" json:"name"`
- Config map[string]interface{} `yaml:"config" json:"config"`
-}
-
-// PixiuCluster represents a Pixiu cluster
-type PixiuCluster struct {
- Name string `yaml:"name" json:"name"`
- Type string `yaml:"type" json:"type"`
- LbPolicy string `yaml:"lb_policy" json:"lb_policy"`
- Endpoints []PixiuEndpoint `yaml:"endpoints" json:"endpoints"`
- HealthCheck *PixiuHealthCheck `yaml:"health_check,omitempty"
json:"health_check,omitempty"`
-}
-
-// PixiuEndpoint represents an endpoint
-type PixiuEndpoint struct {
- Address PixiuAddress `yaml:"address" json:"address"`
-}
-
-// PixiuHealthCheck represents health check configuration
-type PixiuHealthCheck struct {
- Timeout string `yaml:"timeout" json:"timeout"`
- Interval string `yaml:"interval" json:"interval"`
- UnhealthyThreshold int `yaml:"unhealthy_threshold"
json:"unhealthy_threshold"`
- HealthyThreshold int `yaml:"healthy_threshold"
json:"healthy_threshold"`
-}
diff --git a/pkg/dubboagent/pixiu/proxy.go b/pkg/pixiu/proxy.go
similarity index 74%
rename from pkg/dubboagent/pixiu/proxy.go
rename to pkg/pixiu/proxy.go
index 8005bc87..23037b28 100644
--- a/pkg/dubboagent/pixiu/proxy.go
+++ b/pkg/pixiu/proxy.go
@@ -24,11 +24,10 @@ import (
"path/filepath"
)
-var log = dubbolog.RegisterScope("gateway", "pixiu gateway proxy")
+var pixiulog = dubbolog.RegisterScope("gateway", "pixiu gateway debugging")
type Proxy interface {
Run(<-chan error) error
- Drain(skipExit bool) error
Cleanup()
UpdateConfig(config []byte) error
}
@@ -39,48 +38,40 @@ type ProxyConfig struct {
BinaryPath string
}
-type pixiuProxy struct {
+type pixiu struct {
ProxyConfig
}
func NewProxy(cfg ProxyConfig) Proxy {
- return &pixiuProxy{
+ return &pixiu{
ProxyConfig: cfg,
}
}
-func (p *pixiuProxy) Drain(skipExit bool) error {
- return nil
-}
-
-func (p *pixiuProxy) UpdateConfig(config []byte) error {
+func (p *pixiu) UpdateConfig(config []byte) error {
if err := os.WriteFile(p.ConfigPath, config, 0o666); err != nil {
return fmt.Errorf("failed to write gateway config: %v", err)
}
- log.Infof("updated gateway config at %s", p.ConfigPath)
+ pixiulog.Infof("updated gateway config at %s", p.ConfigPath)
return nil
}
-func (p *pixiuProxy) Cleanup() {
+func (p *pixiu) Cleanup() {
if p.ConfigCleanup {
if err := os.Remove(p.ConfigPath); err != nil {
- log.Warnf("Failed to delete config file %s: %v",
p.ConfigPath, err)
+ pixiulog.Warnf("Failed to delete config file %s: %v",
p.ConfigPath, err)
}
}
}
-func (p *pixiuProxy) Run(abort <-chan error) error {
+func (p *pixiu) Run(abort <-chan error) error {
if err := os.MkdirAll(filepath.Dir(p.ConfigPath), 0755); err != nil {
return fmt.Errorf("failed to create config directory: %v", err)
}
- args := []string{
- "gateway",
- "start",
- "-c", p.ConfigPath,
- }
+ args := []string{"gateway", "start", "-c", p.ConfigPath}
- log.Infof("pixiu command: %s %v", p.BinaryPath, args)
+ pixiulog.Infof("Pixiu command: %s %v", p.BinaryPath, args)
cmd := exec.Command(p.BinaryPath, args...)
cmd.Env = os.Environ()
@@ -100,9 +91,9 @@ func (p *pixiuProxy) Run(abort <-chan error) error {
select {
case err := <-abort:
- log.Warnf("aborting gateway")
+ pixiulog.Warnf("Aborting proxy")
if errKill := cmd.Process.Kill(); errKill != nil {
- log.Warnf("killing gateway caused an error %v", errKill)
+ pixiulog.Warnf("killing proxy caused an error %v",
errKill)
}
return err
case err := <-done:
diff --git a/samples/grpc-app/README.md b/samples/grpc-app/README.md
index a641e561..967104ad 100644
--- a/samples/grpc-app/README.md
+++ b/samples/grpc-app/README.md
@@ -105,7 +105,7 @@ spec:
EOF
```
-Now, send a set of 10 requests to verify the traffic distribution:
+Now, send a set of 5 requests to verify the traffic distribution:
```bash
grpcurl -plaintext -d '{"url":
"xds:///provider.grpc-app.svc.cluster.local:7070","count": 5}' localhost:17171
echo.EchoTestService/ForwardEcho
@@ -174,7 +174,7 @@ To enable server-side mTLS, apply a `PeerAuthentication`
policy. The following p
```bash
cat <<EOF | kubectl apply -f -
-apiVersion: security.dubbo.apache.org/v1
+apiVersion: security.dubbo.apache.org/v1alpha3
kind: PeerAuthentication
metadata:
name: provider-mtls