This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new ca8d70b4 [discovery] add proxyless webhook inject code (#798)
ca8d70b4 is described below
commit ca8d70b47542e923e6395d5f93b314652f28f28d
Author: Jian Zhong <[email protected]>
AuthorDate: Wed Oct 1 00:40:46 2025 +0800
[discovery] add proxyless webhook inject code (#798)
---
go.mod | 6 +-
go.sum | 6 +-
pkg/kube/adapter.go | 226 ++++++++++++
pkg/kube/controllers/common.go | 61 ++++
pkg/kube/controllers/queue.go | 120 +++++++
pkg/kube/inject/inject.go | 65 ++++
pkg/kube/inject/template.go | 11 +
pkg/kube/inject/watcher.go | 161 +++++++++
pkg/kube/inject/webhook.go | 396 +++++++++++++++++++++
pkg/kube/util.go | 102 ++++++
.../watcher/configmapwatcher/configmapwatcher.go | 64 ++++
pkg/webhooks/util/util.go | 45 +++
pkg/webhooks/webhookpatch.go | 142 ++++++++
sail/cmd/sail-discovery/app/cmd.go | 3 +
sail/pkg/bootstrap/options.go | 6 +
sail/pkg/bootstrap/proxylessinjector.go | 100 ++++++
sail/pkg/bootstrap/server.go | 19 +
sail/pkg/features/sail.go | 2 +
sail/pkg/model/context.go | 4 +
19 files changed, 1534 insertions(+), 5 deletions(-)
diff --git a/go.mod b/go.mod
index a6b993a4..29fc61a5 100644
--- a/go.mod
+++ b/go.mod
@@ -19,9 +19,11 @@ go 1.24.0
require (
github.com/AlecAivazis/survey/v2 v2.3.7
+ github.com/Masterminds/sprig/v3 v3.3.0
github.com/Microsoft/go-winio v0.6.2
github.com/buildpacks/pack v0.30.0
github.com/cenkalti/backoff/v4 v4.3.0
+ github.com/cespare/xxhash/v2 v2.3.0
github.com/cheggaaa/pb/v3 v3.1.7
github.com/chzyer/readline v1.5.1
github.com/containers/image/v5 v5.34.0
@@ -45,7 +47,6 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/go-multierror v1.1.1
- github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/heroku/color v0.0.6
github.com/moby/term v0.5.2
github.com/ory/viper v1.7.5
@@ -61,6 +62,7 @@ require (
golang.org/x/sys v0.35.0
golang.org/x/term v0.34.0
golang.org/x/time v0.12.0
+ gomodules.xyz/jsonpatch/v2 v2.5.0
google.golang.org/grpc v1.74.2
google.golang.org/protobuf v1.36.7
gopkg.in/yaml.v3 v3.0.1
@@ -94,7 +96,6 @@ require (
github.com/Masterminds/goutils v1.1.1 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/Masterminds/semver/v3 v3.4.0 // indirect
- github.com/Masterminds/sprig/v3 v3.3.0 // indirect
github.com/OneOfOne/xxhash v1.2.8 // indirect
github.com/ProtonMail/go-crypto v1.1.3 // indirect
github.com/VividCortex/ewma v1.2.0 // indirect
@@ -121,7 +122,6 @@ require (
github.com/buildpacks/imgutil v0.0.0-20230626185301-726f02e4225c //
indirect
github.com/buildpacks/lifecycle v0.17.0 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
- github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/chrismellard/docker-credential-acr-env
v0.0.0-20230304212654-82a0ddb27589 // indirect
github.com/cloudflare/circl v1.3.7 // indirect
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 // indirect
diff --git a/go.sum b/go.sum
index 5fc89569..b49d760b 100644
--- a/go.sum
+++ b/go.sum
@@ -255,6 +255,8 @@ github.com/envoyproxy/go-control-plane/envoy
v1.32.5-0.20250627145903-197b96a9c7
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod
h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v1.2.1
h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=
github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod
h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
+github.com/evanphx/json-patch v5.9.11+incompatible
h1:ixHHqfcGvxhWkniF1tWxBHA0yb4Z+d1UQi45df52xW8=
+github.com/evanphx/json-patch v5.9.11+incompatible/go.mod
h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod
h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
github.com/fatih/color v1.18.0/go.mod
h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
@@ -392,8 +394,6 @@ github.com/hashicorp/errwrap v1.1.0
h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY
github.com/hashicorp/errwrap v1.1.0/go.mod
h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1
h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod
h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
-github.com/hashicorp/golang-lru/v2 v2.0.5
h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=
-github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod
h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod
h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/heroku/color v0.0.6 h1:UTFFMrmMLFcL3OweqP1lAdp8i1y/9oHqkeHjQ/b/Ny0=
@@ -808,6 +808,8 @@ golang.org/x/xerrors
v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gomodules.xyz/jsonpatch/v2 v2.5.0
h1:JELs8RLM12qJGXU4u/TO3V25KW8GreMKl9pdkk14RM0=
+gomodules.xyz/jsonpatch/v2 v2.5.0/go.mod
h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY=
google.golang.org/api v0.183.0 h1:PNMeRDwo1pJdgNcFQ9GstuLe/noWKIc89pRWRLMvLwE=
google.golang.org/api v0.183.0/go.mod
h1:q43adC5/pHoSZTx5h2mSmdF7NcyfW9JuDyIOJAgS9ZQ=
google.golang.org/appengine v1.1.0/go.mod
h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
diff --git a/pkg/kube/adapter.go b/pkg/kube/adapter.go
new file mode 100644
index 00000000..b6a84862
--- /dev/null
+++ b/pkg/kube/adapter.go
@@ -0,0 +1,226 @@
+package kube
+
+import (
+ "fmt"
+ admissionv1 "k8s.io/api/admission/v1"
+ kubeApiAdmissionv1beta1 "k8s.io/api/admission/v1beta1"
+ authenticationv1 "k8s.io/api/authentication/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+)
+
+const (
+ // APIVersion constants
+ admissionAPIV1 = "admission.k8s.io/v1"
+ admissionAPIV1beta1 = "admission.k8s.io/v1beta1"
+
+ // Operation constants
+ Create string = "CREATE"
+ Update string = "UPDATE"
+ Delete string = "DELETE"
+ Connect string = "CONNECT"
+)
+
+type AdmissionResponse struct {
+ UID types.UID `json:"uid"`
+ Allowed bool `json:"allowed"`
+ Result *metav1.Status `json:"status,omitempty"`
+ Patch []byte `json:"patch,omitempty"`
+ PatchType *string `json:"patchType,omitempty"`
+ AuditAnnotations map[string]string `json:"auditAnnotations,omitempty"`
+ Warnings []string `json:"warnings,omitempty"`
+}
+
+type AdmissionRequest struct {
+ UID types.UID `json:"uid"`
+ Kind metav1.GroupVersionKind `json:"kind"`
+ Resource metav1.GroupVersionResource `json:"resource"`
+ SubResource string
`json:"subResource,omitempty"`
+ RequestSubResource string
`json:"requestSubResource,omitempty"`
+ RequestKind *metav1.GroupVersionKind
`json:"requestKind,omitempty"`
+ RequestResource *metav1.GroupVersionResource
`json:"requestResource,omitempty"`
+ UserInfo authenticationv1.UserInfo `json:"userInfo"`
+ Name string `json:"name,omitempty"`
+ Namespace string
`json:"namespace,omitempty"`
+ Operation string `json:"operation"`
+ Object runtime.RawExtension
`json:"object,omitempty"`
+ OldObject runtime.RawExtension
`json:"oldObject,omitempty"`
+ DryRun *bool
`json:"dryRun,omitempty"`
+ Options runtime.RawExtension
`json:"options,omitempty"`
+}
+
+type AdmissionReview struct {
+ metav1.TypeMeta
+ Request *AdmissionRequest `json:"request,omitempty"`
+ Response *AdmissionResponse `json:"response,omitempty"`
+}
+
+func AdmissionReviewKubeToAdapter(object runtime.Object) (*AdmissionReview,
error) {
+ var typeMeta metav1.TypeMeta
+ var req *AdmissionRequest
+ var resp *AdmissionResponse
+ switch obj := object.(type) {
+ case *kubeApiAdmissionv1beta1.AdmissionReview:
+ typeMeta = obj.TypeMeta
+ arv1beta1Response := obj.Response
+ arv1beta1Request := obj.Request
+ if arv1beta1Response != nil {
+ resp = &AdmissionResponse{
+ UID: arv1beta1Response.UID,
+ Allowed: arv1beta1Response.Allowed,
+ Result: arv1beta1Response.Result,
+ Patch: arv1beta1Response.Patch,
+ Warnings: arv1beta1Response.Warnings,
+ }
+ if arv1beta1Response.PatchType != nil {
+ patchType :=
string(*arv1beta1Response.PatchType)
+ resp.PatchType = &patchType
+ }
+ }
+ if arv1beta1Request != nil {
+ req = &AdmissionRequest{
+ UID: arv1beta1Request.UID,
+ Kind: arv1beta1Request.Kind,
+ Resource: arv1beta1Request.Resource,
+ UserInfo: arv1beta1Request.UserInfo,
+ Name: arv1beta1Request.Name,
+ Namespace: arv1beta1Request.Namespace,
+ Operation: string(arv1beta1Request.Operation),
+ Object: arv1beta1Request.Object,
+ OldObject: arv1beta1Request.OldObject,
+ DryRun: arv1beta1Request.DryRun,
+ }
+ }
+
+ case *admissionv1.AdmissionReview:
+ typeMeta = obj.TypeMeta
+ arv1Response := obj.Response
+ arv1Request := obj.Request
+ if arv1Response != nil {
+ resp = &AdmissionResponse{
+ UID: arv1Response.UID,
+ Allowed: arv1Response.Allowed,
+ Result: arv1Response.Result,
+ Patch: arv1Response.Patch,
+ Warnings: arv1Response.Warnings,
+ }
+ if arv1Response.PatchType != nil {
+ patchType := string(*arv1Response.PatchType)
+ resp.PatchType = &patchType
+ }
+ }
+
+ if arv1Request != nil {
+ req = &AdmissionRequest{
+ UID: arv1Request.UID,
+ Kind: arv1Request.Kind,
+ Resource: arv1Request.Resource,
+ UserInfo: arv1Request.UserInfo,
+ Name: arv1Request.Name,
+ Namespace: arv1Request.Namespace,
+ Operation: string(arv1Request.Operation),
+ Object: arv1Request.Object,
+ OldObject: arv1Request.OldObject,
+ DryRun: arv1Request.DryRun,
+ }
+ }
+
+ default:
+ return nil, fmt.Errorf("unsupported type :%v",
object.GetObjectKind())
+ }
+
+ return &AdmissionReview{
+ TypeMeta: typeMeta,
+ Request: req,
+ Response: resp,
+ }, nil
+}
+
+func AdmissionReviewAdapterToKube(ar *AdmissionReview, apiVersion string)
runtime.Object {
+ var res runtime.Object
+ arRequest := ar.Request
+ arResponse := ar.Response
+ if apiVersion == "" {
+ apiVersion = admissionAPIV1beta1
+ }
+ switch apiVersion {
+ case admissionAPIV1beta1:
+ arv1beta1 := kubeApiAdmissionv1beta1.AdmissionReview{}
+ if arRequest != nil {
+ arv1beta1.Request =
&kubeApiAdmissionv1beta1.AdmissionRequest{
+ UID: arRequest.UID,
+ Kind: arRequest.Kind,
+ Resource: arRequest.Resource,
+ SubResource: arRequest.SubResource,
+ Name: arRequest.Name,
+ Namespace: arRequest.Namespace,
+ RequestKind: arRequest.RequestKind,
+ RequestResource: arRequest.RequestResource,
+ RequestSubResource:
arRequest.RequestSubResource,
+ Operation:
kubeApiAdmissionv1beta1.Operation(arRequest.Operation),
+ UserInfo: arRequest.UserInfo,
+ Object: arRequest.Object,
+ OldObject: arRequest.OldObject,
+ DryRun: arRequest.DryRun,
+ Options: arRequest.Options,
+ }
+ }
+ if arResponse != nil {
+ var patchType *kubeApiAdmissionv1beta1.PatchType
+ if arResponse.PatchType != nil {
+ patchType =
(*kubeApiAdmissionv1beta1.PatchType)(arResponse.PatchType)
+ }
+ arv1beta1.Response =
&kubeApiAdmissionv1beta1.AdmissionResponse{
+ UID: arResponse.UID,
+ Allowed: arResponse.Allowed,
+ Result: arResponse.Result,
+ Patch: arResponse.Patch,
+ PatchType: patchType,
+ AuditAnnotations: arResponse.AuditAnnotations,
+ Warnings: arResponse.Warnings,
+ }
+ }
+ arv1beta1.TypeMeta = ar.TypeMeta
+ res = &arv1beta1
+ case admissionAPIV1:
+ arv1 := admissionv1.AdmissionReview{}
+ if arRequest != nil {
+ arv1.Request = &admissionv1.AdmissionRequest{
+ UID: arRequest.UID,
+ Kind: arRequest.Kind,
+ Resource: arRequest.Resource,
+ SubResource: arRequest.SubResource,
+ Name: arRequest.Name,
+ Namespace: arRequest.Namespace,
+ RequestKind: arRequest.RequestKind,
+ RequestResource: arRequest.RequestResource,
+ RequestSubResource:
arRequest.RequestSubResource,
+ Operation:
admissionv1.Operation(arRequest.Operation),
+ UserInfo: arRequest.UserInfo,
+ Object: arRequest.Object,
+ OldObject: arRequest.OldObject,
+ DryRun: arRequest.DryRun,
+ Options: arRequest.Options,
+ }
+ }
+ if arResponse != nil {
+ var patchType *admissionv1.PatchType
+ if arResponse.PatchType != nil {
+ patchType =
(*admissionv1.PatchType)(arResponse.PatchType)
+ }
+ arv1.Response = &admissionv1.AdmissionResponse{
+ UID: arResponse.UID,
+ Allowed: arResponse.Allowed,
+ Result: arResponse.Result,
+ Patch: arResponse.Patch,
+ PatchType: patchType,
+ AuditAnnotations: arResponse.AuditAnnotations,
+ Warnings: arResponse.Warnings,
+ }
+ }
+ arv1.TypeMeta = ar.TypeMeta
+ res = &arv1
+ }
+ return res
+}
diff --git a/pkg/kube/controllers/common.go b/pkg/kube/controllers/common.go
index b15e8bf0..227fcc94 100644
--- a/pkg/kube/controllers/common.go
+++ b/pkg/kube/controllers/common.go
@@ -115,3 +115,64 @@ func Extract[T Object](obj any) T {
func ExtractObject(obj any) Object {
return Extract[Object](obj)
}
+
+func FilteredObjectSpecHandler(handler func(o Object), filter func(o Object)
bool) cache.ResourceEventHandler {
+ return filteredObjectHandler(handler, true, filter)
+}
+
+func filteredObjectHandler(handler func(o Object), onlyIncludeSpecChanges
bool, filter func(o Object) bool) cache.ResourceEventHandler {
+ single := func(obj any) {
+ o := ExtractObject(obj)
+ if o == nil {
+ return
+ }
+ if !filter(o) {
+ return
+ }
+ handler(o)
+ }
+ return cache.ResourceEventHandlerFuncs{
+ AddFunc: single,
+ UpdateFunc: func(oldInterface, newInterface any) {
+ oldObj := ExtractObject(oldInterface)
+ if oldObj == nil {
+ return
+ }
+ newObj := ExtractObject(newInterface)
+ if newObj == nil {
+ return
+ }
+ if onlyIncludeSpecChanges &&
oldObj.GetResourceVersion() == newObj.GetResourceVersion() {
+ return
+ }
+ newer := filter(newObj)
+ older := filter(oldObj)
+ if !newer && !older {
+ return
+ }
+ handler(newObj)
+ },
+ DeleteFunc: single,
+ }
+}
+
+func ObjectHandler(handler func(o Object)) cache.ResourceEventHandler {
+ return TypedObjectHandler[Object](handler)
+}
+
+func TypedObjectHandler[T ComparableObject](handler func(o T))
cache.ResourceEventHandler {
+ h := func(obj any) {
+ o := Extract[T](obj)
+ if IsNil(o) {
+ return
+ }
+ handler(o)
+ }
+ return cache.ResourceEventHandlerFuncs{
+ AddFunc: h,
+ UpdateFunc: func(oldObj, newObj any) {
+ h(newObj)
+ },
+ DeleteFunc: h,
+ }
+}
diff --git a/pkg/kube/controllers/queue.go b/pkg/kube/controllers/queue.go
new file mode 100644
index 00000000..929045fe
--- /dev/null
+++ b/pkg/kube/controllers/queue.go
@@ -0,0 +1,120 @@
+package controllers
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "go.uber.org/atomic"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/client-go/util/workqueue"
+)
+
+type ReconcilerFn func(key types.NamespacedName) error
+
+type Queue struct {
+ queue workqueue.TypedRateLimitingInterface[any]
+ initialSync *atomic.Bool
+ name string
+ maxAttempts int
+ workFn func(key any) error
+ closed chan struct{}
+}
+
+func NewQueue(name string, options ...func(*Queue)) Queue {
+ q := Queue{
+ name: name,
+ closed: make(chan struct{}),
+ initialSync: atomic.NewBool(false),
+ }
+ for _, o := range options {
+ o(&q)
+ }
+ if q.queue == nil {
+ q.queue = workqueue.NewTypedRateLimitingQueueWithConfig[any](
+ workqueue.DefaultTypedControllerRateLimiter[any](),
+ workqueue.TypedRateLimitingQueueConfig[any]{
+ Name: name,
+ MetricsProvider: nil,
+ Clock: nil,
+ DelayingQueue: nil,
+ },
+ )
+ }
+ return q
+}
+
+func (q Queue) AddObject(obj Object) {
+ q.queue.Add(config.NamespacedName(obj))
+}
+
+func WithRateLimiter(r workqueue.TypedRateLimiter[any]) func(q *Queue) {
+ return func(q *Queue) {
+ q.queue = workqueue.NewTypedRateLimitingQueue[any](r)
+ }
+}
+
+func WithMaxAttempts(n int) func(q *Queue) {
+ return func(q *Queue) {
+ q.maxAttempts = n
+ }
+}
+
+func WithReconciler(f ReconcilerFn) func(q *Queue) {
+ return func(q *Queue) {
+ q.workFn = func(key any) error {
+ return f(key.(types.NamespacedName))
+ }
+ }
+}
+
+func (q Queue) ShutDownEarly() {
+ q.queue.ShutDown()
+}
+
+type syncSignal struct{}
+
+var defaultSyncSignal = syncSignal{}
+
+func (q Queue) processNextItem() bool {
+ // Wait until there is a new item in the working queue
+ key, quit := q.queue.Get()
+ if quit {
+ // We are done, signal to exit the queue
+ return false
+ }
+
+ // We got the sync signal. This is not a real event, so we exit early
after signaling we are synced
+ if key == defaultSyncSignal {
+ q.initialSync.Store(true)
+ return true
+ }
+
+ // 'Done marks item as done processing' - should be called at the end
of all processing
+ defer q.queue.Done(key)
+
+ err := q.workFn(key)
+ if err != nil {
+ retryCount := q.queue.NumRequeues(key) + 1
+ if retryCount < q.maxAttempts {
+ q.queue.AddRateLimited(key)
+ // Return early, so we do not call Forget(), allowing
the rate limiting to backoff
+ return true
+ }
+ }
+ // 'Forget indicates that an item is finished being retried.' - should
be called whenever we do not want to backoff on this key.
+ q.queue.Forget(key)
+ return true
+}
+
+func (q Queue) Run(stop <-chan struct{}) {
+ defer q.queue.ShutDown()
+ q.queue.Add(defaultSyncSignal)
+ go func() {
+ // Process updates until we return false, which indicates the
queue is terminated
+ for q.processNextItem() {
+ }
+ close(q.closed)
+ }()
+ select {
+ case <-stop:
+ case <-q.closed:
+ }
+}
diff --git a/pkg/kube/inject/inject.go b/pkg/kube/inject/inject.go
new file mode 100644
index 00000000..d2042f99
--- /dev/null
+++ b/pkg/kube/inject/inject.go
@@ -0,0 +1,65 @@
+package inject
+
+import (
+ "fmt"
+ "github.com/Masterminds/sprig/v3"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/klog/v2"
+ "sigs.k8s.io/yaml"
+ "text/template"
+)
+
+type (
+ Template *corev1.Pod
+ RawTemplates map[string]string
+ Templates map[string]*template.Template
+)
+
+type Config struct {
+ Templates Templates `json:"-"`
+ RawTemplates RawTemplates `json:"templates"`
+ InjectedAnnotations map[string]string `json:"injectedAnnotations"`
+ DefaultTemplates []string `json:"defaultTemplates"`
+ Aliases map[string][]string `json:"aliases"`
+}
+
+func UnmarshalConfig(yml []byte) (Config, error) {
+ var injectConfig Config
+ if err := yaml.Unmarshal(yml, &injectConfig); err != nil {
+ return injectConfig, fmt.Errorf("failed to unmarshal injection
template: %v", err)
+ }
+
+ var err error
+ injectConfig.Templates, err = ParseTemplates(injectConfig.RawTemplates)
+ if err != nil {
+ return injectConfig, err
+ }
+
+ return injectConfig, nil
+}
+
+func parseDryTemplate(tmplStr string, funcMap map[string]any)
(*template.Template, error) {
+ temp := template.New("inject")
+ t, err := temp.Funcs(sprig.TxtFuncMap()).Funcs(funcMap).Parse(tmplStr)
+ if err != nil {
+ klog.Infof("Failed to parse template: %v %v\n", err, tmplStr)
+ return nil, err
+ }
+
+ return t, nil
+}
+
+func potentialPodName(metadata metav1.ObjectMeta) string {
+ if metadata.Name != "" {
+ return metadata.Name
+ }
+ if metadata.GenerateName != "" {
+ return metadata.GenerateName + "***** (actual name not yet
known)"
+ }
+ return ""
+}
+
+func RunTemplate(params InjectionParameters) (mergedPod *corev1.Pod,
templatePod *corev1.Pod, err error) {
+ return mergedPod, templatePod, nil
+}
diff --git a/pkg/kube/inject/template.go b/pkg/kube/inject/template.go
new file mode 100644
index 00000000..62e0b60e
--- /dev/null
+++ b/pkg/kube/inject/template.go
@@ -0,0 +1,11 @@
+package inject
+
+import (
+ "text/template"
+)
+
+var InjectionFuncmap = createInjectionFuncmap()
+
+func createInjectionFuncmap() template.FuncMap {
+ return template.FuncMap{}
+}
diff --git a/pkg/kube/inject/watcher.go b/pkg/kube/inject/watcher.go
new file mode 100644
index 00000000..e7988ef5
--- /dev/null
+++ b/pkg/kube/inject/watcher.go
@@ -0,0 +1,161 @@
+package inject
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/kube"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/watcher/configmapwatcher"
+ "github.com/fsnotify/fsnotify"
+ v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/klog/v2"
+ "path/filepath"
+ "time"
+)
+
+type Watcher interface {
+ SetHandler(func(*Config, string) error)
+ Run(<-chan struct{})
+ Get() (*Config, string, error)
+}
+
+type fileWatcher struct {
+ watcher *fsnotify.Watcher
+ configFile string
+ valuesFile string
+ handler func(*Config, string) error
+}
+
+type configMapWatcher struct {
+ c *configmapwatcher.Controller
+ client kube.Client
+ namespace string
+ name string
+ configKey string
+ valuesKey string
+ handler func(*Config, string) error
+}
+
+func NewFileWatcher(configFile, valuesFile string) (Watcher, error) {
+ watcher, err := fsnotify.NewWatcher()
+ if err != nil {
+ return nil, err
+ }
+ // watch the parent directory of the target files so we can catch
+ // symlink updates of k8s ConfigMaps volumes.
+ watchDir, _ := filepath.Split(configFile)
+ if err := watcher.Add(watchDir); err != nil {
+ return nil, fmt.Errorf("could not watch %v: %v", watchDir, err)
+ }
+ return &fileWatcher{
+ watcher: watcher,
+ configFile: configFile,
+ valuesFile: valuesFile,
+ }, nil
+}
+
+func (w *fileWatcher) Run(stop <-chan struct{}) {
+ defer w.watcher.Close()
+ var timerC <-chan time.Time
+ for {
+ select {
+ case <-timerC:
+ timerC = nil
+ sidecarConfig, valuesConfig, err := w.Get()
+ if err != nil {
+ klog.Errorf("update error: %v", err)
+ break
+ }
+ if w.handler != nil {
+ if err := w.handler(sidecarConfig,
valuesConfig); err != nil {
+ klog.Errorf("update error: %v", err)
+ }
+ }
+ case event, ok := <-w.watcher.Events:
+ if !ok {
+ return
+ }
+ klog.V(2).Infof("Injector watch update: %+v", event)
+ // use a timer to debounce configuration updates
+ if (event.Has(fsnotify.Write) ||
event.Has(fsnotify.Create)) && timerC == nil {
+ timerC = time.After(watchDebounceDelay)
+ }
+ case err, ok := <-w.watcher.Errors:
+ if !ok {
+ return
+ }
+ klog.Errorf("Watcher error: %v", err)
+ case <-stop:
+ return
+ }
+ }
+}
+
+func (w *fileWatcher) SetHandler(handler func(*Config, string) error) {
+ w.handler = handler
+}
+
+func (w *fileWatcher) Get() (*Config, string, error) {
+ return loadConfig(w.configFile, w.valuesFile)
+}
+
+func (w *configMapWatcher) SetHandler(handler func(*Config, string) error) {
+ w.handler = handler
+}
+
+func (w *configMapWatcher) Run(stop <-chan struct{}) {
+ w.c.Run(stop)
+}
+
+func (w *configMapWatcher) Get() (*Config, string, error) {
+ cms := w.client.Kube().CoreV1().ConfigMaps(w.namespace)
+ cm, err := cms.Get(context.TODO(), w.name, metav1.GetOptions{})
+ if err != nil {
+ return nil, "", err
+ }
+ return readConfigMap(cm, w.configKey, w.valuesKey)
+}
+
+func NewConfigMapWatcher(client kube.Client, namespace, name, configKey,
valuesKey string) Watcher {
+ w := &configMapWatcher{
+ client: client,
+ namespace: namespace,
+ name: name,
+ configKey: configKey,
+ valuesKey: valuesKey,
+ }
+ w.c = configmapwatcher.NewController(client, namespace, name, func(cm
*v1.ConfigMap) {
+ sidecarConfig, valuesConfig, err := readConfigMap(cm,
configKey, valuesKey)
+ if err != nil {
+ klog.Warningf("failed to read injection config from
ConfigMap: %v", err)
+ return
+ }
+ if w.handler != nil {
+ if err := w.handler(sidecarConfig, valuesConfig); err
!= nil {
+ klog.Errorf("update error: %v", err)
+ }
+ }
+ })
+ return w
+}
+
+func readConfigMap(cm *v1.ConfigMap, configKey, valuesKey string) (*Config,
string, error) {
+ if cm == nil {
+ return nil, "", fmt.Errorf("no ConfigMap found")
+ }
+
+ configYaml, exists := cm.Data[configKey]
+ if !exists {
+ return nil, "", fmt.Errorf("missing ConfigMap config key %q",
configKey)
+ }
+ c, err := unmarshalConfig([]byte(configYaml))
+ if err != nil {
+ return nil, "", fmt.Errorf("failed reading config: %v.
YAML:\n%s", err, configYaml)
+ }
+
+ valuesConfig, exists := cm.Data[valuesKey]
+ if !exists {
+ return nil, "", fmt.Errorf("missing ConfigMap values key %q",
valuesKey)
+ }
+ return c, valuesConfig, nil
+}
diff --git a/pkg/kube/inject/webhook.go b/pkg/kube/inject/webhook.go
index 5f8858f9..372ea741 100644
--- a/pkg/kube/inject/webhook.go
+++ b/pkg/kube/inject/webhook.go
@@ -1,4 +1,400 @@
package inject
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ opconfig "github.com/apache/dubbo-kubernetes/operator/pkg/apis"
+ "github.com/apache/dubbo-kubernetes/pkg/kube"
+ "github.com/apache/dubbo-kubernetes/pkg/util/protomarshal"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ "gomodules.xyz/jsonpatch/v2"
+ meshconfig "istio.io/api/mesh/v1alpha1"
+ admissionv1 "k8s.io/api/admission/v1"
+ kubeApiAdmissionv1beta1 "k8s.io/api/admission/v1beta1"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/serializer"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/klog/v2"
+ "net/http"
+ "os"
+ "sigs.k8s.io/yaml"
+ "strings"
+ "sync"
+ "text/template"
+ "time"
+)
+
+var (
+ runtimeScheme = runtime.NewScheme()
+ codecs = serializer.NewCodecFactory(runtimeScheme)
+ deserializer = codecs.UniversalDeserializer()
+)
+
+func init() {
+ _ = corev1.AddToScheme(runtimeScheme)
+ _ = admissionv1.AddToScheme(runtimeScheme)
+ _ = kubeApiAdmissionv1beta1.AddToScheme(runtimeScheme)
+}
+
+const (
+ watchDebounceDelay = 100 * time.Millisecond
+)
+
type Webhook struct {
+ mu sync.RWMutex
+ watcher Watcher
+ meshConfig *meshconfig.MeshConfig
+ env *model.Environment
+ Config *Config
+ valuesConfig ValuesConfig
+}
+
+func NewWebhook(p WebhookParameters) (*Webhook, error) {
+ if p.Mux == nil {
+ return nil, errors.New("expected mux to be passed, but was not
passed")
+ }
+
+ wh := &Webhook{
+ watcher: p.Watcher,
+ meshConfig: p.Env.Mesh(),
+ env: p.Env,
+ }
+
+ sidecarConfig, valuesConfig, err := p.Watcher.Get()
+ if err != nil {
+ return nil, fmt.Errorf("failed to get initial configuration:
%v", err)
+ }
+ if err := wh.updateConfig(sidecarConfig, valuesConfig); err != nil {
+ return nil, fmt.Errorf("failed to process webhook config: %v",
err)
+ }
+
+ p.Mux.HandleFunc("/inject", wh.serveInject)
+ p.Mux.HandleFunc("/inject/", wh.serveInject)
+
+ p.Env.Watcher.AddMeshHandler(func() {
+ wh.mu.Lock()
+ wh.meshConfig = p.Env.Mesh()
+ wh.mu.Unlock()
+ })
+
+ return wh, nil
+}
+
+type WebhookParameters struct {
+ Watcher Watcher
+ Port int
+ Env *model.Environment
+ Mux *http.ServeMux
+}
+
+type ValuesConfig struct {
+ raw string
+ asStruct *opconfig.Values
+ asMap map[string]any
+}
+
+func loadConfig(injectFile, valuesFile string) (*Config, string, error) {
+ data, err := os.ReadFile(injectFile)
+ if err != nil {
+ return nil, "", err
+ }
+ var c *Config
+ if c, err = unmarshalConfig(data); err != nil {
+ klog.Warningf("Failed to parse injectFile %s", string(data))
+ return nil, "", err
+ }
+
+ valuesConfig, err := os.ReadFile(valuesFile)
+ if err != nil {
+ return nil, "", err
+ }
+ return c, string(valuesConfig), nil
+}
+
+func unmarshalConfig(data []byte) (*Config, error) {
+ c, err := UnmarshalConfig(data)
+ if err != nil {
+ return nil, err
+ }
+ return &c, nil
+}
+
+func (wh *Webhook) updateConfig(sidecarConfig *Config, valuesConfig string)
error {
+ wh.mu.Lock()
+ defer wh.mu.Unlock()
+ wh.Config = sidecarConfig
+ vc, err := NewValuesConfig(valuesConfig)
+ if err != nil {
+ return fmt.Errorf("failed to create new values config: %v", err)
+ }
+ wh.valuesConfig = vc
+ return nil
+}
+
+func NewValuesConfig(v string) (ValuesConfig, error) {
+ c := ValuesConfig{raw: v}
+ valuesStruct := &opconfig.Values{}
+ if err := protomarshal.ApplyYAML(v, valuesStruct); err != nil {
+ return c, fmt.Errorf("could not parse configuration values:
%v", err)
+ }
+ c.asStruct = valuesStruct
+
+ values := map[string]any{}
+ if err := yaml.Unmarshal([]byte(v), &values); err != nil {
+ return c, fmt.Errorf("could not parse configuration values:
%v", err)
+ }
+ c.asMap = values
+ return c, nil
+}
+
+func (wh *Webhook) serveInject(w http.ResponseWriter, r *http.Request) {
+ var body []byte
+ if r.Body != nil {
+ if data, err := kube.HTTPConfigReader(r); err == nil {
+ body = data
+ } else {
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+ }
+ if len(body) == 0 {
+ http.Error(w, "no body found", http.StatusBadRequest)
+ return
+ }
+
+ // verify the content type is accurate
+ contentType := r.Header.Get("Content-Type")
+ if contentType != "application/json" {
+ http.Error(w, "invalid Content-Type, want `application/json`",
http.StatusUnsupportedMediaType)
+ return
+ }
+
+ path := ""
+ if r.URL != nil {
+ path = r.URL.Path
+ }
+
+ var reviewResponse *kube.AdmissionResponse
+ var obj runtime.Object
+ var ar *kube.AdmissionReview
+ if out, _, err := deserializer.Decode(body, nil, obj); err != nil {
+ reviewResponse = toAdmissionResponse(err)
+ } else {
+ ar, err = kube.AdmissionReviewKubeToAdapter(out)
+ if err != nil {
+ reviewResponse = toAdmissionResponse(err)
+ } else {
+ reviewResponse = wh.inject(ar, path)
+ }
+ }
+
+ response := kube.AdmissionReview{}
+ response.Response = reviewResponse
+ var responseKube runtime.Object
+ var apiVersion string
+ if ar != nil {
+ apiVersion = ar.APIVersion
+ response.TypeMeta = ar.TypeMeta
+ if response.Response != nil {
+ if ar.Request != nil {
+ response.Response.UID = ar.Request.UID
+ }
+ }
+ }
+ responseKube = kube.AdmissionReviewAdapterToKube(&response, apiVersion)
+ resp, err := json.Marshal(responseKube)
+ if err != nil {
+ http.Error(w, fmt.Sprintf("could not encode response: %v",
err), http.StatusInternalServerError)
+ return
+ }
+ if _, err := w.Write(resp); err != nil {
+ klog.Errorf("Could not write response: %v", err)
+ http.Error(w, fmt.Sprintf("could not write response: %v", err),
http.StatusInternalServerError)
+ return
+ }
+}
+
+type InjectionParameters struct {
+ pod *corev1.Pod
+ deployMeta types.NamespacedName
+ namespace *corev1.Namespace
+ nativeSidecar bool
+ typeMeta metav1.TypeMeta
+ templates map[string]*template.Template
+ defaultTemplate []string
+ aliases map[string][]string
+ meshConfig *meshconfig.MeshConfig
+ proxyConfig *meshconfig.ProxyConfig
+ valuesConfig ValuesConfig
+ revision string
+ proxyEnvs map[string]string
+ injectedAnnotations map[string]string
+}
+
+func injectPod(req InjectionParameters) ([]byte, error) {
+ // The patch will be built relative to the initial pod, capture its
current state
+ originalPodSpec, err := json.Marshal(req.pod)
+ if err != nil {
+ return nil, err
+ }
+
+ // Run the injection template, giving us a partial pod spec
+ mergedPod, injectedPodData, err := RunTemplate(req)
+ if err != nil {
+ return nil, fmt.Errorf("failed to run injection template: %v",
err)
+ }
+
+ mergedPod, err = reapplyOverwrittenContainers(mergedPod, req.pod,
injectedPodData, req.proxyConfig)
+ if err != nil {
+ return nil, fmt.Errorf("failed to re apply container: %v", err)
+ }
+
+ // Apply some additional transformations to the pod
+ if err := postProcessPod(mergedPod, *injectedPodData, req); err != nil {
+ return nil, fmt.Errorf("failed to process pod: %v", err)
+ }
+
+ patch, err := createPatch(mergedPod, originalPodSpec)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create patch: %v", err)
+ }
+
+ return patch, nil
+}
+
+func reapplyOverwrittenContainers(finalPod *corev1.Pod, originalPod
*corev1.Pod, templatePod *corev1.Pod, proxyConfig *meshconfig.ProxyConfig)
(*corev1.Pod, error) {
+ return finalPod, nil
+}
+
+func createPatch(pod *corev1.Pod, original []byte) ([]byte, error) {
+ reinjected, err := json.Marshal(pod)
+ if err != nil {
+ return nil, err
+ }
+ p, err := jsonpatch.CreatePatch(original, reinjected)
+ if err != nil {
+ return nil, err
+ }
+ return json.Marshal(p)
+}
+
+func postProcessPod(pod *corev1.Pod, injectedPod corev1.Pod, req
InjectionParameters) error {
+ if pod.Annotations == nil {
+ pod.Annotations = map[string]string{}
+ }
+ if pod.Labels == nil {
+ pod.Labels = map[string]string{}
+ }
+ return nil
+}
+
+func parseInjectEnvs(path string) map[string]string {
+ path = strings.TrimSuffix(path, "/")
+ res := func(path string) []string {
+ parts := strings.SplitN(path, "/", 3)
+ var newRes []string
+ if len(parts) == 3 { // If length is less than 3, then the path
is simply "/inject".
+ if strings.HasPrefix(parts[2], ":ENV:") {
+ // Deprecated, not recommended.
+ // Note that this syntax fails validation
when used to set injectionPath (i.e., service.path in mwh).
+ // It doesn't fail validation when used to
set injectionURL, however. K8s bug maybe?
+ pairs := strings.Split(parts[2], ":ENV:")
+ for i := 1; i < len(pairs); i++ { // skip the
first part, it is a nil
+ pair := strings.SplitN(pairs[i], "=", 2)
+ // The first part is the variable name
which can not be empty
+ // the second part is the variable
value which can be empty but has to exist
+ // for example, aaa=bbb, aaa= are
valid, but =aaa or = are not valid, the
+ // invalid ones will be ignored.
+ if len(pair[0]) > 0 && len(pair) == 2 {
+ newRes = append(newRes, pair...)
+ }
+ }
+ return newRes
+ }
+ newRes = strings.Split(parts[2], "/")
+ }
+ for i, value := range newRes {
+ if i%2 != 0 {
+ // Replace --slash-- with / in values.
+ newRes[i] = strings.ReplaceAll(value,
"--slash--", "/")
+ }
+ }
+ return newRes
+ }(path)
+ newEnvs := make(map[string]string)
+
+ for i := 0; i < len(res); i += 2 {
+ k := res[i]
+ if i == len(res)-1 { // ignore the last key without value
+ klog.Warningf("Odd number of inject env entries, ignore
the last key %s\n", k)
+ break
+ }
+ }
+
+ return newEnvs
+}
+
+func (wh *Webhook) inject(ar *kube.AdmissionReview, path string)
*kube.AdmissionResponse {
+ req := ar.Request
+ var pod corev1.Pod
+ if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
+ return toAdmissionResponse(err)
+ }
+
+ pod.ManagedFields = nil
+
+ klog.Infof("Process proxyless injection request")
+
+ proxyConfig := wh.env.GetProxyConfigOrDefault(pod.Namespace,
pod.Labels, pod.Annotations, wh.meshConfig)
+ deploy, typeMeta := kube.GetDeployMetaFromPod(&pod)
+ params := InjectionParameters{
+ pod: &pod,
+ deployMeta: deploy,
+ typeMeta: typeMeta,
+ templates: wh.Config.Templates,
+ defaultTemplate: wh.Config.DefaultTemplates,
+ aliases: wh.Config.Aliases,
+ meshConfig: wh.meshConfig,
+ proxyConfig: proxyConfig,
+ valuesConfig: wh.valuesConfig,
+ injectedAnnotations: wh.Config.InjectedAnnotations,
+ proxyEnvs: parseInjectEnvs(path),
+ }
+
+ patchBytes, err := injectPod(params)
+ if err != nil {
+ return toAdmissionResponse(err)
+ }
+ reviewResponse := kube.AdmissionResponse{
+ Allowed: true,
+ Patch: patchBytes,
+ PatchType: func() *string {
+ pt := "JSONPatch"
+ return &pt
+ }(),
+ }
+ return &reviewResponse
+}
+
+func (wh *Webhook) Run(stop <-chan struct{}) {
+ go wh.watcher.Run(stop)
+}
+
+func ParseTemplates(tmpls RawTemplates) (Templates, error) {
+ ret := make(Templates, len(tmpls))
+ for k, t := range tmpls {
+ p, err := parseDryTemplate(t, InjectionFuncmap)
+ if err != nil {
+ return nil, err
+ }
+ ret[k] = p
+ }
+ return ret, nil
+}
+
+func toAdmissionResponse(err error) *kube.AdmissionResponse {
+ return &kube.AdmissionResponse{Result: &metav1.Status{Message:
err.Error()}}
}
diff --git a/pkg/kube/util.go b/pkg/kube/util.go
index 6491fac0..10309ed3 100644
--- a/pkg/kube/util.go
+++ b/pkg/kube/util.go
@@ -18,15 +18,25 @@
package kube
import (
+ "fmt"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
+ "io"
corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
+ "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
+ "net/http"
"os"
+ "regexp"
+ "strings"
)
+var cronJobNameRegexp = regexp.MustCompile(`(.+)-\d{8,10}$`)
+
func DefaultRestConfig(kubeconfig, configContext string, fns ...func(config
*rest.Config)) (*rest.Config, error) {
config, err := BuildClientConfig(kubeconfig, configContext)
if err != nil {
@@ -90,3 +100,95 @@ func SetRestDefaults(config *rest.Config) *rest.Config {
}
return config
}
+
+const MaxRequestBodyBytes = int64(6 * 1024 * 1024)
+
+func HTTPConfigReader(req *http.Request) ([]byte, error) {
+ defer req.Body.Close()
+ lr := &io.LimitedReader{
+ R: req.Body,
+ N: MaxRequestBodyBytes + 1,
+ }
+ data, err := io.ReadAll(lr)
+ if err != nil {
+ return nil, err
+ }
+ if lr.N <= 0 {
+ return nil,
errors.NewRequestEntityTooLargeError(fmt.Sprintf("limit is %d",
MaxRequestBodyBytes))
+ }
+ return data, nil
+}
+
+func GetDeployMetaFromPod(pod *corev1.Pod) (types.NamespacedName,
metav1.TypeMeta) {
+ if pod == nil {
+ return types.NamespacedName{}, metav1.TypeMeta{}
+ }
+ // try to capture more useful namespace/name info for deployments, etc.
+ // TODO(dougreid): expand to enable lookup of OWNERs recursively a la
kubernetesenv
+
+ deployMeta := types.NamespacedName{Name: pod.Name, Namespace:
pod.Namespace}
+
+ typeMetadata := metav1.TypeMeta{
+ Kind: "Pod",
+ APIVersion: "v1",
+ }
+ if len(pod.GenerateName) > 0 {
+ // if the pod name was generated (or is scheduled for
generation), we can begin an investigation into the controlling reference for
the pod.
+ var controllerRef metav1.OwnerReference
+ controllerFound := false
+ for _, ref := range pod.GetOwnerReferences() {
+ if ref.Controller != nil && *ref.Controller {
+ controllerRef = ref
+ controllerFound = true
+ break
+ }
+ }
+ if controllerFound {
+ typeMetadata.APIVersion = controllerRef.APIVersion
+ typeMetadata.Kind = controllerRef.Kind
+
+ // heuristic for deployment detection
+ deployMeta.Name = controllerRef.Name
+ if typeMetadata.Kind == "ReplicaSet" &&
pod.Labels["pod-template-hash"] != "" && strings.HasSuffix(controllerRef.Name,
pod.Labels["pod-template-hash"]) {
+ name := strings.TrimSuffix(controllerRef.Name,
"-"+pod.Labels["pod-template-hash"])
+ deployMeta.Name = name
+ typeMetadata.Kind = "Deployment"
+ } else if typeMetadata.Kind == "ReplicaSet" &&
pod.Labels["rollouts-pod-template-hash"] != "" &&
+ strings.HasSuffix(controllerRef.Name,
pod.Labels["rollouts-pod-template-hash"]) {
+ // Heuristic for ArgoCD Rollout
+ name := strings.TrimSuffix(controllerRef.Name,
"-"+pod.Labels["rollouts-pod-template-hash"])
+ deployMeta.Name = name
+ typeMetadata.Kind = "Rollout"
+ typeMetadata.APIVersion = "v1alpha1"
+ } else if typeMetadata.Kind == "ReplicationController"
&& pod.Labels["deploymentconfig"] != "" {
+ // If the pod is controlled by the replication
controller, which is created by the DeploymentConfig resource in
+ // Openshift platform, set the deploy name to
the deployment config's name, and the kind to 'DeploymentConfig'.
+ //
+ // nolint: lll
+ // For DeploymentConfig details, refer to
+ //
https://docs.openshift.com/container-platform/4.1/applications/deployments/what-deployments-are.html#deployments-and-deploymentconfigs_what-deployments-are
+ //
+ // For the reference to the pod label
'deploymentconfig', refer to
+ //
https://github.com/openshift/library-go/blob/7a65fdb398e28782ee1650959a5e0419121e97ae/pkg/apps/appsutil/const.go#L25
+ deployMeta.Name = pod.Labels["deploymentconfig"]
+ typeMetadata.Kind = "DeploymentConfig"
+ } else if typeMetadata.Kind == "Job" {
+ // If job name suffixed with
`-<digit-timestamp>`, where the length of digit timestamp is 8~10,
+ // trim the suffix and set kind to cron job.
+ if jn :=
cronJobNameRegexp.FindStringSubmatch(controllerRef.Name); len(jn) == 2 {
+ deployMeta.Name = jn[1]
+ typeMetadata.Kind = "CronJob"
+ // heuristically set cron job api
version to v1 as it cannot be derived from pod metadata.
+ typeMetadata.APIVersion = "batch/v1"
+ }
+ }
+ }
+ }
+
+ if deployMeta.Name == "" {
+ // if we haven't been able to extract a deployment name, then
just give it the pod name
+ deployMeta.Name = pod.Name
+ }
+
+ return deployMeta, typeMetadata
+}
diff --git a/pkg/kube/watcher/configmapwatcher/configmapwatcher.go
b/pkg/kube/watcher/configmapwatcher/configmapwatcher.go
new file mode 100644
index 00000000..3225f61e
--- /dev/null
+++ b/pkg/kube/watcher/configmapwatcher/configmapwatcher.go
@@ -0,0 +1,64 @@
+package configmapwatcher
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/kube"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
+ "go.uber.org/atomic"
+ v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/fields"
+ "k8s.io/apimachinery/pkg/types"
+)
+
+type Controller struct {
+ configmaps kclient.Client[*v1.ConfigMap]
+ queue controllers.Queue
+
+ configMapNamespace string
+ configMapName string
+ callback func(*v1.ConfigMap)
+
+ hasSynced atomic.Bool
+}
+
+func NewController(client kube.Client, namespace, name string, callback
func(*v1.ConfigMap)) *Controller {
+ c := &Controller{
+ configMapNamespace: namespace,
+ configMapName: name,
+ callback: callback,
+ }
+
+ c.configmaps = kclient.NewFiltered[*v1.ConfigMap](client,
kclient.Filter{
+ Namespace: namespace,
+ FieldSelector:
fields.OneTermEqualSelector(metav1.ObjectNameField, name).String(),
+ })
+
+ c.queue = controllers.NewQueue("configmap "+name,
controllers.WithReconciler(c.processItem))
+
c.configmaps.AddEventHandler(controllers.FilteredObjectSpecHandler(c.queue.AddObject,
func(o controllers.Object) bool {
+ // Filter out other configmaps
+ return o.GetName() == name && o.GetNamespace() == namespace
+ }))
+
+ return c
+}
+
+func (c *Controller) processItem(name types.NamespacedName) error {
+ cm := c.configmaps.Get(name.Name, name.Namespace)
+ c.callback(cm)
+
+ c.hasSynced.Store(true)
+ return nil
+}
+
+func (c *Controller) Run(stop <-chan struct{}) {
+ // Start informer immediately instead of with the rest. This is because
we use configmapwatcher for
+ // single types (so its never shared), and for use cases where we need
the results immediately
+ // during startup.
+ c.configmaps.Start(stop)
+ if !kube.WaitForCacheSync("configmap "+c.configMapName, stop,
c.configmaps.HasSynced) {
+ c.queue.ShutDownEarly()
+ return
+ }
+ c.queue.Run(stop)
+}
diff --git a/pkg/webhooks/util/util.go b/pkg/webhooks/util/util.go
new file mode 100644
index 00000000..bdd099ec
--- /dev/null
+++ b/pkg/webhooks/util/util.go
@@ -0,0 +1,45 @@
+package util
+
+import (
+ "crypto/x509"
+ "encoding/pem"
+ "errors"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/keycertbundle"
+)
+
+type ConfigError struct {
+ err error
+ reason string
+}
+
+func (e ConfigError) Error() string {
+ return e.err.Error()
+}
+
+func (e ConfigError) Reason() string {
+ return e.reason
+}
+
+func LoadCABundle(caBundleWatcher *keycertbundle.Watcher) ([]byte, error) {
+ caBundle := caBundleWatcher.GetCABundle()
+ if err := VerifyCABundle(caBundle); err != nil {
+ return nil, &ConfigError{err, "could not verify caBundle"}
+ }
+
+ return caBundle, nil
+}
+
+func VerifyCABundle(caBundle []byte) error {
+ block, _ := pem.Decode(caBundle)
+ if block == nil {
+ return errors.New("could not decode pem")
+ }
+ if block.Type != "CERTIFICATE" {
+ return fmt.Errorf("cert contains wrong pem type: %q",
block.Type)
+ }
+ if _, err := x509.ParseCertificate(block.Bytes); err != nil {
+ return fmt.Errorf("cert contains invalid x509 certificate: %v",
err)
+ }
+ return nil
+}
diff --git a/pkg/webhooks/webhookpatch.go b/pkg/webhooks/webhookpatch.go
new file mode 100644
index 00000000..71c7b7b4
--- /dev/null
+++ b/pkg/webhooks/webhookpatch.go
@@ -0,0 +1,142 @@
+package webhooks
+
+import (
+ "bytes"
+ "errors"
+ kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
+ "github.com/apache/dubbo-kubernetes/pkg/webhooks/util"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/keycertbundle"
+ "istio.io/api/label"
+ v1 "k8s.io/api/admissionregistration/v1"
+ kerrors "k8s.io/apimachinery/pkg/api/errors"
+ klabels "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/client-go/util/workqueue"
+ "k8s.io/klog/v2"
+ "math"
+ "strings"
+ "time"
+)
+
+var (
+ errWrongRevision = errors.New("webhook does not belong to target
revision")
+ errNotFound = errors.New("webhook not found")
+ errNoWebhookWithName = errors.New("webhook configuration did not
contain webhook with target name")
+)
+
+type WebhookCertPatcher struct {
+ // revision to patch webhooks for
+ revision string
+ webhookName string
+
+ queue controllers.Queue
+
+ // File path to the x509 certificate bundle used by the webhook server
+ // and patched into the webhook config.
+ CABundleWatcher *keycertbundle.Watcher
+
+ webhooks kclient.Client[*v1.MutatingWebhookConfiguration]
+}
+
+func NewWebhookCertPatcher(client kubelib.Client, webhookName string,
caBundleWatcher *keycertbundle.Watcher) (*WebhookCertPatcher, error) {
+ p := &WebhookCertPatcher{
+ webhookName: webhookName,
+ CABundleWatcher: caBundleWatcher,
+ }
+ p.queue = newWebhookPatcherQueue(p.webhookPatchTask)
+
+ p.webhooks = kclient.New[*v1.MutatingWebhookConfiguration](client)
+ p.webhooks.AddEventHandler(controllers.ObjectHandler(p.queue.AddObject))
+
+ return p, nil
+}
+
+func newWebhookPatcherQueue(reconciler controllers.ReconcilerFn)
controllers.Queue {
+ return controllers.NewQueue("webhook patcher",
+ controllers.WithReconciler(reconciler),
+
controllers.WithRateLimiter(workqueue.NewTypedItemFastSlowRateLimiter[any](100*time.Millisecond,
1*time.Minute, 5)),
+ controllers.WithMaxAttempts(math.MaxInt))
+}
+
+func (w *WebhookCertPatcher) webhookPatchTask(o types.NamespacedName) error {
+ err := w.patchMutatingWebhookConfig(o.Name)
+
+ // do not want to retry the task if these errors occur, they indicate
that
+ // we should no longer be patching the given webhook
+ if kerrors.IsNotFound(err) || errors.Is(err, errWrongRevision) ||
errors.Is(err, errNoWebhookWithName) || errors.Is(err, errNotFound) {
+ return nil
+ }
+
+ if err != nil {
+ klog.Errorf("patching webhook %s failed: %v", o.Name, err)
+ }
+
+ return err
+}
+
+func (w *WebhookCertPatcher) patchMutatingWebhookConfig(webhookConfigName
string) error {
+ config := w.webhooks.Get(webhookConfigName, "")
+ if config == nil {
+ return errNotFound
+ }
+ // prevents a race condition between multiple istiods when the revision
is changed or modified
+ v, ok := config.Labels[label.IoIstioRev.Name]
+ if !ok {
+ return nil
+ }
+ if v != w.revision {
+ return errWrongRevision
+ }
+
+ found := false
+ updated := false
+ caCertPem, err := util.LoadCABundle(w.CABundleWatcher)
+ if err != nil {
+ klog.Errorf("Failed to load CA bundle: %v", err)
+ return err
+ }
+ for i, wh := range config.Webhooks {
+ if strings.HasSuffix(wh.Name, w.webhookName) {
+ if !bytes.Equal(caCertPem,
config.Webhooks[i].ClientConfig.CABundle) {
+ updated = true
+ }
+ config.Webhooks[i].ClientConfig.CABundle = caCertPem
+ found = true
+ }
+ }
+ if !found {
+ return errNoWebhookWithName
+ }
+
+ if updated {
+ _, err := w.webhooks.Update(config)
+ if err != nil {
+ }
+ }
+
+ return err
+}
+
+func (w *WebhookCertPatcher) Run(stopChan <-chan struct{}) {
+ go w.startCaBundleWatcher(stopChan)
+ w.webhooks.Start(stopChan)
+ kubelib.WaitForCacheSync("webhook patcher", stopChan,
w.webhooks.HasSynced)
+ w.queue.Run(stopChan)
+}
+
+func (w *WebhookCertPatcher) startCaBundleWatcher(stop <-chan struct{}) {
+ id, watchCh := w.CABundleWatcher.AddWatcher()
+ defer w.CABundleWatcher.RemoveWatcher(id)
+ for {
+ select {
+ case <-watchCh:
+ for _, whc := range w.webhooks.List("",
klabels.Everything()) {
+ w.queue.AddObject(whc)
+ }
+ case <-stop:
+ return
+ }
+ }
+}
diff --git a/sail/cmd/sail-discovery/app/cmd.go
b/sail/cmd/sail-discovery/app/cmd.go
index 12eb57bb..2b0f24dd 100644
--- a/sail/cmd/sail-discovery/app/cmd.go
+++ b/sail/cmd/sail-discovery/app/cmd.go
@@ -89,6 +89,9 @@ func newDiscoveryCommand() *cobra.Command {
func addFlags(c *cobra.Command) {
serverArgs = bootstrap.NewSailArgs(func(p *bootstrap.SailArgs) {
p.CtrlZOptions = ctrlz.DefaultOptions()
+ p.InjectionOptions = bootstrap.InjectionOptions{
+ InjectionDirectory: "./var/lib/dubbo/inject",
+ }
})
c.PersistentFlags().StringSliceVar(&serverArgs.RegistryOptions.Registries,
"registries",
[]string{string(provider.Kubernetes)},
diff --git a/sail/pkg/bootstrap/options.go b/sail/pkg/bootstrap/options.go
index e4facdc9..d262d4b3 100644
--- a/sail/pkg/bootstrap/options.go
+++ b/sail/pkg/bootstrap/options.go
@@ -39,9 +39,15 @@ type RegistryOptions struct {
ClusterRegistriesNamespace string
}
+type InjectionOptions struct {
+ // Directory of injection related config files.
+ InjectionDirectory string
+}
+
type SailArgs struct {
ServerOptions DiscoveryServerOptions
RegistryOptions RegistryOptions
+ InjectionOptions InjectionOptions
MeshConfigFile string
NetworksConfigFile string
PodName string
diff --git a/sail/pkg/bootstrap/proxylessinjector.go
b/sail/pkg/bootstrap/proxylessinjector.go
new file mode 100644
index 00000000..6c35b198
--- /dev/null
+++ b/sail/pkg/bootstrap/proxylessinjector.go
@@ -0,0 +1,100 @@
+package bootstrap
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/env"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/inject"
+ "github.com/apache/dubbo-kubernetes/pkg/webhooks"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/features"
+ "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/klog/v2"
+ "os"
+ "path/filepath"
+)
+
+const (
+ webhookName = "proxyless-injector.dubbo.io"
+ defaultInjectorConfigMapName = "dubbo-proxyless-injector"
+)
+
+var injectionEnabled = env.Register("INJECT_ENABLED", true, "Enable mutating
webhook handler.")
+
+func (s *Server) initProxylessInjector(args *SailArgs) (*inject.Webhook,
error) {
+ // currently the constant: "./var/lib/dubbo/inject"
+ injectPath := args.InjectionOptions.InjectionDirectory
+ if injectPath == "" || !injectionEnabled.Get() {
+ klog.Infof("Skipping proxyless injector, injection path is
missing or disabled.")
+ return nil, nil
+ }
+
+ // If the injection config exists either locally or remotely, we will
set up injection.
+ var watcher inject.Watcher
+ if _, err := os.Stat(filepath.Join(injectPath, "config"));
!os.IsNotExist(err) {
+ configFile := filepath.Join(injectPath, "config")
+ valuesFile := filepath.Join(injectPath, "values")
+ watcher, err = inject.NewFileWatcher(configFile, valuesFile)
+ if err != nil {
+ return nil, err
+ }
+ } else if s.kubeClient != nil {
+ configMapName := getInjectorConfigMapName("")
+ cms := s.kubeClient.Kube().CoreV1().ConfigMaps(args.Namespace)
+ if _, err := cms.Get(context.TODO(), configMapName,
metav1.GetOptions{}); err != nil {
+ if errors.IsNotFound(err) {
+ klog.Infof("Skipping proxyless injector,
template not found")
+ return nil, nil
+ }
+ return nil, err
+ }
+ watcher = inject.NewConfigMapWatcher(s.kubeClient,
args.Namespace, configMapName, "config", "values")
+ } else {
+ klog.Infof("Skipping proxyless injector, template not found")
+ return nil, nil
+ }
+
+ klog.Info("initializing proxyless injector")
+
+ parameters := inject.WebhookParameters{
+ Watcher: watcher,
+ Env: s.environment,
+ Mux: s.httpsMux,
+ }
+
+ wh, err := inject.NewWebhook(parameters)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create injection webhook:
%v", err)
+ }
+ // Patch cert if a webhook config name is provided.
+ // This requires RBAC permissions - a low-priv Istiod should not
attempt to patch but rely on
+ // operator or CI/CD
+ if features.InjectionWebhookConfigName != "" {
+ s.addStartFunc("injection patcher", func(stop <-chan struct{})
error {
+ // No leader election - different istiod revisions will
patch their own cert.
+ // update webhook configuration by watching the cabundle
+ patcher, err :=
webhooks.NewWebhookCertPatcher(s.kubeClient, webhookName,
s.dubbodCertBundleWatcher)
+ if err != nil {
+ klog.Errorf("failed to create webhook cert
patcher: %v", err)
+ return nil
+ }
+
+ go patcher.Run(stop)
+ return nil
+ })
+ }
+
+ s.addStartFunc("injection server", func(stop <-chan struct{}) error {
+ wh.Run(stop)
+ return nil
+ })
+ return wh, nil
+}
+
+func getInjectorConfigMapName(revision string) string {
+ name := defaultInjectorConfigMapName
+ if revision == "" || revision == "default" {
+ return name
+ }
+ return name + "-" + revision
+}
diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go
index 1a60f80d..5a427f50 100644
--- a/sail/pkg/bootstrap/server.go
+++ b/sail/pkg/bootstrap/server.go
@@ -50,6 +50,7 @@ import (
caserver "github.com/apache/dubbo-kubernetes/security/pkg/server/ca"
"github.com/fsnotify/fsnotify"
grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
+ "go.uber.org/atomic"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@@ -111,9 +112,16 @@ type Server struct {
readinessProbes map[string]readinessProbe
+ readinessFlags *readinessFlags
+
webhookInfo *webhookInfo
}
+type readinessFlags struct {
+ proxylessInjectorReady atomic.Bool
+ configValidationReady atomic.Bool
+}
+
type webhookInfo struct {
mu sync.RWMutex
wh *inject.Webhook
@@ -139,6 +147,8 @@ func NewServer(args *SailArgs, initFuncs ...func(*Server))
(*Server, error) {
dubbodCertBundleWatcher: keycertbundle.NewWatcher(),
fileWatcher: filewatcher.NewWatcher(),
internalStop: make(chan struct{}),
+ readinessFlags: &readinessFlags{},
+ webhookInfo: &webhookInfo{},
}
for _, fn := range initFuncs {
fn(s)
@@ -216,6 +226,15 @@ func NewServer(args *SailArgs, initFuncs ...func(*Server))
(*Server, error) {
if s.kubeClient != nil {
s.initSecureWebhookServer(args)
+ wh, err := s.initProxylessInjector(args)
+ if err != nil {
+ return nil, fmt.Errorf("error initializing proxyless
injector: %v", err)
+ }
+ s.readinessFlags.proxylessInjectorReady.Store(true)
+ s.webhookInfo.mu.Lock()
+ s.webhookInfo.wh = wh
+ s.webhookInfo.mu.Unlock()
+
if err := s.initConfigValidation(args); err != nil {
return nil, fmt.Errorf("error initializing config
validator: %v", err)
}
diff --git a/sail/pkg/features/sail.go b/sail/pkg/features/sail.go
index 08213a17..f0a22a5c 100644
--- a/sail/pkg/features/sail.go
+++ b/sail/pkg/features/sail.go
@@ -62,4 +62,6 @@ var (
DubbodServiceCustomHost = env.Register("DUBBOD_CUSTOM_HOST", "",
"Custom host name of dubbod that dubbod signs the server cert.
"+
"Multiple custom host names are supported, and multiple
values are separated by commas.").Get()
+ InjectionWebhookConfigName =
env.Register("INJECTION_WEBHOOK_CONFIG_NAME", "istio-sidecar-injector",
+ "Name of the mutatingwebhookconfiguration to patch, if istioctl
is not used.").Get()
)
diff --git a/sail/pkg/model/context.go b/sail/pkg/model/context.go
index 98c0bd75..35e39be1 100644
--- a/sail/pkg/model/context.go
+++ b/sail/pkg/model/context.go
@@ -100,6 +100,10 @@ func (e *Environment) GetDiscoveryAddress() (host.Name,
string, error) {
return host.Name(hostname), port, nil
}
+func (e *Environment) GetProxyConfigOrDefault(ns string, labels, annotations
map[string]string, meshConfig *meshconfig.MeshConfig) *meshconfig.ProxyConfig {
+ return mesh.DefaultProxyConfig()
+}
+
func (e *Environment) ClusterLocal() ClusterLocalProvider {
return e.clusterLocalServices
}