This is an automated email from the ASF dual-hosted git repository.

zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git


The following commit(s) were added to refs/heads/master by this push:
     new ca8d70b4 [discovery] add proxyless webhook inject code (#798)
ca8d70b4 is described below

commit ca8d70b47542e923e6395d5f93b314652f28f28d
Author: Jian Zhong <[email protected]>
AuthorDate: Wed Oct 1 00:40:46 2025 +0800

    [discovery] add proxyless webhook inject code (#798)
---
 go.mod                                             |   6 +-
 go.sum                                             |   6 +-
 pkg/kube/adapter.go                                | 226 ++++++++++++
 pkg/kube/controllers/common.go                     |  61 ++++
 pkg/kube/controllers/queue.go                      | 120 +++++++
 pkg/kube/inject/inject.go                          |  65 ++++
 pkg/kube/inject/template.go                        |  11 +
 pkg/kube/inject/watcher.go                         | 161 +++++++++
 pkg/kube/inject/webhook.go                         | 396 +++++++++++++++++++++
 pkg/kube/util.go                                   | 102 ++++++
 .../watcher/configmapwatcher/configmapwatcher.go   |  64 ++++
 pkg/webhooks/util/util.go                          |  45 +++
 pkg/webhooks/webhookpatch.go                       | 142 ++++++++
 sail/cmd/sail-discovery/app/cmd.go                 |   3 +
 sail/pkg/bootstrap/options.go                      |   6 +
 sail/pkg/bootstrap/proxylessinjector.go            | 100 ++++++
 sail/pkg/bootstrap/server.go                       |  19 +
 sail/pkg/features/sail.go                          |   2 +
 sail/pkg/model/context.go                          |   4 +
 19 files changed, 1534 insertions(+), 5 deletions(-)

diff --git a/go.mod b/go.mod
index a6b993a4..29fc61a5 100644
--- a/go.mod
+++ b/go.mod
@@ -19,9 +19,11 @@ go 1.24.0
 
 require (
        github.com/AlecAivazis/survey/v2 v2.3.7
+       github.com/Masterminds/sprig/v3 v3.3.0
        github.com/Microsoft/go-winio v0.6.2
        github.com/buildpacks/pack v0.30.0
        github.com/cenkalti/backoff/v4 v4.3.0
+       github.com/cespare/xxhash/v2 v2.3.0
        github.com/cheggaaa/pb/v3 v3.1.7
        github.com/chzyer/readline v1.5.1
        github.com/containers/image/v5 v5.34.0
@@ -45,7 +47,6 @@ require (
        github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2
        github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
        github.com/hashicorp/go-multierror v1.1.1
-       github.com/hashicorp/golang-lru/v2 v2.0.5
        github.com/heroku/color v0.0.6
        github.com/moby/term v0.5.2
        github.com/ory/viper v1.7.5
@@ -61,6 +62,7 @@ require (
        golang.org/x/sys v0.35.0
        golang.org/x/term v0.34.0
        golang.org/x/time v0.12.0
+       gomodules.xyz/jsonpatch/v2 v2.5.0
        google.golang.org/grpc v1.74.2
        google.golang.org/protobuf v1.36.7
        gopkg.in/yaml.v3 v3.0.1
@@ -94,7 +96,6 @@ require (
        github.com/Masterminds/goutils v1.1.1 // indirect
        github.com/Masterminds/semver v1.5.0 // indirect
        github.com/Masterminds/semver/v3 v3.4.0 // indirect
-       github.com/Masterminds/sprig/v3 v3.3.0 // indirect
        github.com/OneOfOne/xxhash v1.2.8 // indirect
        github.com/ProtonMail/go-crypto v1.1.3 // indirect
        github.com/VividCortex/ewma v1.2.0 // indirect
@@ -121,7 +122,6 @@ require (
        github.com/buildpacks/imgutil v0.0.0-20230626185301-726f02e4225c // 
indirect
        github.com/buildpacks/lifecycle v0.17.0 // indirect
        github.com/cespare/xxhash v1.1.0 // indirect
-       github.com/cespare/xxhash/v2 v2.3.0 // indirect
        github.com/chrismellard/docker-credential-acr-env 
v0.0.0-20230304212654-82a0ddb27589 // indirect
        github.com/cloudflare/circl v1.3.7 // indirect
        github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 // indirect
diff --git a/go.sum b/go.sum
index 5fc89569..b49d760b 100644
--- a/go.sum
+++ b/go.sum
@@ -255,6 +255,8 @@ github.com/envoyproxy/go-control-plane/envoy 
v1.32.5-0.20250627145903-197b96a9c7
 github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod 
h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
 github.com/envoyproxy/protoc-gen-validate v1.2.1 
h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=
 github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod 
h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
+github.com/evanphx/json-patch v5.9.11+incompatible 
h1:ixHHqfcGvxhWkniF1tWxBHA0yb4Z+d1UQi45df52xW8=
+github.com/evanphx/json-patch v5.9.11+incompatible/go.mod 
h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
 github.com/fatih/color v1.7.0/go.mod 
h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
 github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
 github.com/fatih/color v1.18.0/go.mod 
h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
@@ -392,8 +394,6 @@ github.com/hashicorp/errwrap v1.1.0 
h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY
 github.com/hashicorp/errwrap v1.1.0/go.mod 
h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
 github.com/hashicorp/go-multierror v1.1.1 
h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
 github.com/hashicorp/go-multierror v1.1.1/go.mod 
h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
-github.com/hashicorp/golang-lru/v2 v2.0.5 
h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=
-github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod 
h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
 github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
 github.com/hashicorp/hcl v1.0.0/go.mod 
h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
 github.com/heroku/color v0.0.6 h1:UTFFMrmMLFcL3OweqP1lAdp8i1y/9oHqkeHjQ/b/Ny0=
@@ -808,6 +808,8 @@ golang.org/x/xerrors 
v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gomodules.xyz/jsonpatch/v2 v2.5.0 
h1:JELs8RLM12qJGXU4u/TO3V25KW8GreMKl9pdkk14RM0=
+gomodules.xyz/jsonpatch/v2 v2.5.0/go.mod 
h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY=
 google.golang.org/api v0.183.0 h1:PNMeRDwo1pJdgNcFQ9GstuLe/noWKIc89pRWRLMvLwE=
 google.golang.org/api v0.183.0/go.mod 
h1:q43adC5/pHoSZTx5h2mSmdF7NcyfW9JuDyIOJAgS9ZQ=
 google.golang.org/appengine v1.1.0/go.mod 
h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
diff --git a/pkg/kube/adapter.go b/pkg/kube/adapter.go
new file mode 100644
index 00000000..b6a84862
--- /dev/null
+++ b/pkg/kube/adapter.go
@@ -0,0 +1,226 @@
+package kube
+
+import (
+       "fmt"
+       admissionv1 "k8s.io/api/admission/v1"
+       kubeApiAdmissionv1beta1 "k8s.io/api/admission/v1beta1"
+       authenticationv1 "k8s.io/api/authentication/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/types"
+)
+
+const (
+       // APIVersion constants
+       admissionAPIV1      = "admission.k8s.io/v1"
+       admissionAPIV1beta1 = "admission.k8s.io/v1beta1"
+
+       // Operation constants
+       Create  string = "CREATE"
+       Update  string = "UPDATE"
+       Delete  string = "DELETE"
+       Connect string = "CONNECT"
+)
+
+type AdmissionResponse struct {
+       UID              types.UID         `json:"uid"`
+       Allowed          bool              `json:"allowed"`
+       Result           *metav1.Status    `json:"status,omitempty"`
+       Patch            []byte            `json:"patch,omitempty"`
+       PatchType        *string           `json:"patchType,omitempty"`
+       AuditAnnotations map[string]string `json:"auditAnnotations,omitempty"`
+       Warnings         []string          `json:"warnings,omitempty"`
+}
+
+type AdmissionRequest struct {
+       UID                types.UID                    `json:"uid"`
+       Kind               metav1.GroupVersionKind      `json:"kind"`
+       Resource           metav1.GroupVersionResource  `json:"resource"`
+       SubResource        string                       
`json:"subResource,omitempty"`
+       RequestSubResource string                       
`json:"requestSubResource,omitempty"`
+       RequestKind        *metav1.GroupVersionKind     
`json:"requestKind,omitempty"`
+       RequestResource    *metav1.GroupVersionResource 
`json:"requestResource,omitempty"`
+       UserInfo           authenticationv1.UserInfo    `json:"userInfo"`
+       Name               string                       `json:"name,omitempty"`
+       Namespace          string                       
`json:"namespace,omitempty"`
+       Operation          string                       `json:"operation"`
+       Object             runtime.RawExtension         
`json:"object,omitempty"`
+       OldObject          runtime.RawExtension         
`json:"oldObject,omitempty"`
+       DryRun             *bool                        
`json:"dryRun,omitempty"`
+       Options            runtime.RawExtension         
`json:"options,omitempty"`
+}
+
+type AdmissionReview struct {
+       metav1.TypeMeta
+       Request  *AdmissionRequest  `json:"request,omitempty"`
+       Response *AdmissionResponse `json:"response,omitempty"`
+}
+
+func AdmissionReviewKubeToAdapter(object runtime.Object) (*AdmissionReview, 
error) {
+       var typeMeta metav1.TypeMeta
+       var req *AdmissionRequest
+       var resp *AdmissionResponse
+       switch obj := object.(type) {
+       case *kubeApiAdmissionv1beta1.AdmissionReview:
+               typeMeta = obj.TypeMeta
+               arv1beta1Response := obj.Response
+               arv1beta1Request := obj.Request
+               if arv1beta1Response != nil {
+                       resp = &AdmissionResponse{
+                               UID:      arv1beta1Response.UID,
+                               Allowed:  arv1beta1Response.Allowed,
+                               Result:   arv1beta1Response.Result,
+                               Patch:    arv1beta1Response.Patch,
+                               Warnings: arv1beta1Response.Warnings,
+                       }
+                       if arv1beta1Response.PatchType != nil {
+                               patchType := 
string(*arv1beta1Response.PatchType)
+                               resp.PatchType = &patchType
+                       }
+               }
+               if arv1beta1Request != nil {
+                       req = &AdmissionRequest{
+                               UID:       arv1beta1Request.UID,
+                               Kind:      arv1beta1Request.Kind,
+                               Resource:  arv1beta1Request.Resource,
+                               UserInfo:  arv1beta1Request.UserInfo,
+                               Name:      arv1beta1Request.Name,
+                               Namespace: arv1beta1Request.Namespace,
+                               Operation: string(arv1beta1Request.Operation),
+                               Object:    arv1beta1Request.Object,
+                               OldObject: arv1beta1Request.OldObject,
+                               DryRun:    arv1beta1Request.DryRun,
+                       }
+               }
+
+       case *admissionv1.AdmissionReview:
+               typeMeta = obj.TypeMeta
+               arv1Response := obj.Response
+               arv1Request := obj.Request
+               if arv1Response != nil {
+                       resp = &AdmissionResponse{
+                               UID:      arv1Response.UID,
+                               Allowed:  arv1Response.Allowed,
+                               Result:   arv1Response.Result,
+                               Patch:    arv1Response.Patch,
+                               Warnings: arv1Response.Warnings,
+                       }
+                       if arv1Response.PatchType != nil {
+                               patchType := string(*arv1Response.PatchType)
+                               resp.PatchType = &patchType
+                       }
+               }
+
+               if arv1Request != nil {
+                       req = &AdmissionRequest{
+                               UID:       arv1Request.UID,
+                               Kind:      arv1Request.Kind,
+                               Resource:  arv1Request.Resource,
+                               UserInfo:  arv1Request.UserInfo,
+                               Name:      arv1Request.Name,
+                               Namespace: arv1Request.Namespace,
+                               Operation: string(arv1Request.Operation),
+                               Object:    arv1Request.Object,
+                               OldObject: arv1Request.OldObject,
+                               DryRun:    arv1Request.DryRun,
+                       }
+               }
+
+       default:
+               return nil, fmt.Errorf("unsupported type :%v", 
object.GetObjectKind())
+       }
+
+       return &AdmissionReview{
+               TypeMeta: typeMeta,
+               Request:  req,
+               Response: resp,
+       }, nil
+}
+
+func AdmissionReviewAdapterToKube(ar *AdmissionReview, apiVersion string) 
runtime.Object {
+       var res runtime.Object
+       arRequest := ar.Request
+       arResponse := ar.Response
+       if apiVersion == "" {
+               apiVersion = admissionAPIV1beta1
+       }
+       switch apiVersion {
+       case admissionAPIV1beta1:
+               arv1beta1 := kubeApiAdmissionv1beta1.AdmissionReview{}
+               if arRequest != nil {
+                       arv1beta1.Request = 
&kubeApiAdmissionv1beta1.AdmissionRequest{
+                               UID:                arRequest.UID,
+                               Kind:               arRequest.Kind,
+                               Resource:           arRequest.Resource,
+                               SubResource:        arRequest.SubResource,
+                               Name:               arRequest.Name,
+                               Namespace:          arRequest.Namespace,
+                               RequestKind:        arRequest.RequestKind,
+                               RequestResource:    arRequest.RequestResource,
+                               RequestSubResource: 
arRequest.RequestSubResource,
+                               Operation:          
kubeApiAdmissionv1beta1.Operation(arRequest.Operation),
+                               UserInfo:           arRequest.UserInfo,
+                               Object:             arRequest.Object,
+                               OldObject:          arRequest.OldObject,
+                               DryRun:             arRequest.DryRun,
+                               Options:            arRequest.Options,
+                       }
+               }
+               if arResponse != nil {
+                       var patchType *kubeApiAdmissionv1beta1.PatchType
+                       if arResponse.PatchType != nil {
+                               patchType = 
(*kubeApiAdmissionv1beta1.PatchType)(arResponse.PatchType)
+                       }
+                       arv1beta1.Response = 
&kubeApiAdmissionv1beta1.AdmissionResponse{
+                               UID:              arResponse.UID,
+                               Allowed:          arResponse.Allowed,
+                               Result:           arResponse.Result,
+                               Patch:            arResponse.Patch,
+                               PatchType:        patchType,
+                               AuditAnnotations: arResponse.AuditAnnotations,
+                               Warnings:         arResponse.Warnings,
+                       }
+               }
+               arv1beta1.TypeMeta = ar.TypeMeta
+               res = &arv1beta1
+       case admissionAPIV1:
+               arv1 := admissionv1.AdmissionReview{}
+               if arRequest != nil {
+                       arv1.Request = &admissionv1.AdmissionRequest{
+                               UID:                arRequest.UID,
+                               Kind:               arRequest.Kind,
+                               Resource:           arRequest.Resource,
+                               SubResource:        arRequest.SubResource,
+                               Name:               arRequest.Name,
+                               Namespace:          arRequest.Namespace,
+                               RequestKind:        arRequest.RequestKind,
+                               RequestResource:    arRequest.RequestResource,
+                               RequestSubResource: 
arRequest.RequestSubResource,
+                               Operation:          
admissionv1.Operation(arRequest.Operation),
+                               UserInfo:           arRequest.UserInfo,
+                               Object:             arRequest.Object,
+                               OldObject:          arRequest.OldObject,
+                               DryRun:             arRequest.DryRun,
+                               Options:            arRequest.Options,
+                       }
+               }
+               if arResponse != nil {
+                       var patchType *admissionv1.PatchType
+                       if arResponse.PatchType != nil {
+                               patchType = 
(*admissionv1.PatchType)(arResponse.PatchType)
+                       }
+                       arv1.Response = &admissionv1.AdmissionResponse{
+                               UID:              arResponse.UID,
+                               Allowed:          arResponse.Allowed,
+                               Result:           arResponse.Result,
+                               Patch:            arResponse.Patch,
+                               PatchType:        patchType,
+                               AuditAnnotations: arResponse.AuditAnnotations,
+                               Warnings:         arResponse.Warnings,
+                       }
+               }
+               arv1.TypeMeta = ar.TypeMeta
+               res = &arv1
+       }
+       return res
+}
diff --git a/pkg/kube/controllers/common.go b/pkg/kube/controllers/common.go
index b15e8bf0..227fcc94 100644
--- a/pkg/kube/controllers/common.go
+++ b/pkg/kube/controllers/common.go
@@ -115,3 +115,64 @@ func Extract[T Object](obj any) T {
 func ExtractObject(obj any) Object {
        return Extract[Object](obj)
 }
+
+func FilteredObjectSpecHandler(handler func(o Object), filter func(o Object) 
bool) cache.ResourceEventHandler {
+       return filteredObjectHandler(handler, true, filter)
+}
+
+func filteredObjectHandler(handler func(o Object), onlyIncludeSpecChanges 
bool, filter func(o Object) bool) cache.ResourceEventHandler {
+       single := func(obj any) {
+               o := ExtractObject(obj)
+               if o == nil {
+                       return
+               }
+               if !filter(o) {
+                       return
+               }
+               handler(o)
+       }
+       return cache.ResourceEventHandlerFuncs{
+               AddFunc: single,
+               UpdateFunc: func(oldInterface, newInterface any) {
+                       oldObj := ExtractObject(oldInterface)
+                       if oldObj == nil {
+                               return
+                       }
+                       newObj := ExtractObject(newInterface)
+                       if newObj == nil {
+                               return
+                       }
+                       if onlyIncludeSpecChanges && 
oldObj.GetResourceVersion() == newObj.GetResourceVersion() {
+                               return
+                       }
+                       newer := filter(newObj)
+                       older := filter(oldObj)
+                       if !newer && !older {
+                               return
+                       }
+                       handler(newObj)
+               },
+               DeleteFunc: single,
+       }
+}
+
+func ObjectHandler(handler func(o Object)) cache.ResourceEventHandler {
+       return TypedObjectHandler[Object](handler)
+}
+
+func TypedObjectHandler[T ComparableObject](handler func(o T)) 
cache.ResourceEventHandler {
+       h := func(obj any) {
+               o := Extract[T](obj)
+               if IsNil(o) {
+                       return
+               }
+               handler(o)
+       }
+       return cache.ResourceEventHandlerFuncs{
+               AddFunc: h,
+               UpdateFunc: func(oldObj, newObj any) {
+                       h(newObj)
+               },
+               DeleteFunc: h,
+       }
+}
diff --git a/pkg/kube/controllers/queue.go b/pkg/kube/controllers/queue.go
new file mode 100644
index 00000000..929045fe
--- /dev/null
+++ b/pkg/kube/controllers/queue.go
@@ -0,0 +1,120 @@
+package controllers
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/config"
+       "go.uber.org/atomic"
+       "k8s.io/apimachinery/pkg/types"
+       "k8s.io/client-go/util/workqueue"
+)
+
+type ReconcilerFn func(key types.NamespacedName) error
+
+type Queue struct {
+       queue       workqueue.TypedRateLimitingInterface[any]
+       initialSync *atomic.Bool
+       name        string
+       maxAttempts int
+       workFn      func(key any) error
+       closed      chan struct{}
+}
+
+func NewQueue(name string, options ...func(*Queue)) Queue {
+       q := Queue{
+               name:        name,
+               closed:      make(chan struct{}),
+               initialSync: atomic.NewBool(false),
+       }
+       for _, o := range options {
+               o(&q)
+       }
+       if q.queue == nil {
+               q.queue = workqueue.NewTypedRateLimitingQueueWithConfig[any](
+                       workqueue.DefaultTypedControllerRateLimiter[any](),
+                       workqueue.TypedRateLimitingQueueConfig[any]{
+                               Name:            name,
+                               MetricsProvider: nil,
+                               Clock:           nil,
+                               DelayingQueue:   nil,
+                       },
+               )
+       }
+       return q
+}
+
+func (q Queue) AddObject(obj Object) {
+       q.queue.Add(config.NamespacedName(obj))
+}
+
+func WithRateLimiter(r workqueue.TypedRateLimiter[any]) func(q *Queue) {
+       return func(q *Queue) {
+               q.queue = workqueue.NewTypedRateLimitingQueue[any](r)
+       }
+}
+
+func WithMaxAttempts(n int) func(q *Queue) {
+       return func(q *Queue) {
+               q.maxAttempts = n
+       }
+}
+
+func WithReconciler(f ReconcilerFn) func(q *Queue) {
+       return func(q *Queue) {
+               q.workFn = func(key any) error {
+                       return f(key.(types.NamespacedName))
+               }
+       }
+}
+
+func (q Queue) ShutDownEarly() {
+       q.queue.ShutDown()
+}
+
+type syncSignal struct{}
+
+var defaultSyncSignal = syncSignal{}
+
+func (q Queue) processNextItem() bool {
+       // Wait until there is a new item in the working queue
+       key, quit := q.queue.Get()
+       if quit {
+               // We are done, signal to exit the queue
+               return false
+       }
+
+       // We got the sync signal. This is not a real event, so we exit early 
after signaling we are synced
+       if key == defaultSyncSignal {
+               q.initialSync.Store(true)
+               return true
+       }
+
+       // 'Done marks item as done processing' - should be called at the end 
of all processing
+       defer q.queue.Done(key)
+
+       err := q.workFn(key)
+       if err != nil {
+               retryCount := q.queue.NumRequeues(key) + 1
+               if retryCount < q.maxAttempts {
+                       q.queue.AddRateLimited(key)
+                       // Return early, so we do not call Forget(), allowing 
the rate limiting to backoff
+                       return true
+               }
+       }
+       // 'Forget indicates that an item is finished being retried.' - should 
be called whenever we do not want to backoff on this key.
+       q.queue.Forget(key)
+       return true
+}
+
+func (q Queue) Run(stop <-chan struct{}) {
+       defer q.queue.ShutDown()
+       q.queue.Add(defaultSyncSignal)
+       go func() {
+               // Process updates until we return false, which indicates the 
queue is terminated
+               for q.processNextItem() {
+               }
+               close(q.closed)
+       }()
+       select {
+       case <-stop:
+       case <-q.closed:
+       }
+}
diff --git a/pkg/kube/inject/inject.go b/pkg/kube/inject/inject.go
new file mode 100644
index 00000000..d2042f99
--- /dev/null
+++ b/pkg/kube/inject/inject.go
@@ -0,0 +1,65 @@
+package inject
+
+import (
+       "fmt"
+       "github.com/Masterminds/sprig/v3"
+       corev1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/klog/v2"
+       "sigs.k8s.io/yaml"
+       "text/template"
+)
+
+type (
+       Template     *corev1.Pod
+       RawTemplates map[string]string
+       Templates    map[string]*template.Template
+)
+
+type Config struct {
+       Templates           Templates           `json:"-"`
+       RawTemplates        RawTemplates        `json:"templates"`
+       InjectedAnnotations map[string]string   `json:"injectedAnnotations"`
+       DefaultTemplates    []string            `json:"defaultTemplates"`
+       Aliases             map[string][]string `json:"aliases"`
+}
+
+func UnmarshalConfig(yml []byte) (Config, error) {
+       var injectConfig Config
+       if err := yaml.Unmarshal(yml, &injectConfig); err != nil {
+               return injectConfig, fmt.Errorf("failed to unmarshal injection 
template: %v", err)
+       }
+
+       var err error
+       injectConfig.Templates, err = ParseTemplates(injectConfig.RawTemplates)
+       if err != nil {
+               return injectConfig, err
+       }
+
+       return injectConfig, nil
+}
+
+func parseDryTemplate(tmplStr string, funcMap map[string]any) 
(*template.Template, error) {
+       temp := template.New("inject")
+       t, err := temp.Funcs(sprig.TxtFuncMap()).Funcs(funcMap).Parse(tmplStr)
+       if err != nil {
+               klog.Infof("Failed to parse template: %v %v\n", err, tmplStr)
+               return nil, err
+       }
+
+       return t, nil
+}
+
+func potentialPodName(metadata metav1.ObjectMeta) string {
+       if metadata.Name != "" {
+               return metadata.Name
+       }
+       if metadata.GenerateName != "" {
+               return metadata.GenerateName + "***** (actual name not yet 
known)"
+       }
+       return ""
+}
+
+func RunTemplate(params InjectionParameters) (mergedPod *corev1.Pod, 
templatePod *corev1.Pod, err error) {
+       return mergedPod, templatePod, nil
+}
diff --git a/pkg/kube/inject/template.go b/pkg/kube/inject/template.go
new file mode 100644
index 00000000..62e0b60e
--- /dev/null
+++ b/pkg/kube/inject/template.go
@@ -0,0 +1,11 @@
+package inject
+
+import (
+       "text/template"
+)
+
+var InjectionFuncmap = createInjectionFuncmap()
+
+func createInjectionFuncmap() template.FuncMap {
+       return template.FuncMap{}
+}
diff --git a/pkg/kube/inject/watcher.go b/pkg/kube/inject/watcher.go
new file mode 100644
index 00000000..e7988ef5
--- /dev/null
+++ b/pkg/kube/inject/watcher.go
@@ -0,0 +1,161 @@
+package inject
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/kube"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/watcher/configmapwatcher"
+       "github.com/fsnotify/fsnotify"
+       v1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/klog/v2"
+       "path/filepath"
+       "time"
+)
+
+type Watcher interface {
+       SetHandler(func(*Config, string) error)
+       Run(<-chan struct{})
+       Get() (*Config, string, error)
+}
+
+type fileWatcher struct {
+       watcher    *fsnotify.Watcher
+       configFile string
+       valuesFile string
+       handler    func(*Config, string) error
+}
+
+type configMapWatcher struct {
+       c         *configmapwatcher.Controller
+       client    kube.Client
+       namespace string
+       name      string
+       configKey string
+       valuesKey string
+       handler   func(*Config, string) error
+}
+
+func NewFileWatcher(configFile, valuesFile string) (Watcher, error) {
+       watcher, err := fsnotify.NewWatcher()
+       if err != nil {
+               return nil, err
+       }
+       // watch the parent directory of the target files so we can catch
+       // symlink updates of k8s ConfigMaps volumes.
+       watchDir, _ := filepath.Split(configFile)
+       if err := watcher.Add(watchDir); err != nil {
+               return nil, fmt.Errorf("could not watch %v: %v", watchDir, err)
+       }
+       return &fileWatcher{
+               watcher:    watcher,
+               configFile: configFile,
+               valuesFile: valuesFile,
+       }, nil
+}
+
+func (w *fileWatcher) Run(stop <-chan struct{}) {
+       defer w.watcher.Close()
+       var timerC <-chan time.Time
+       for {
+               select {
+               case <-timerC:
+                       timerC = nil
+                       sidecarConfig, valuesConfig, err := w.Get()
+                       if err != nil {
+                               klog.Errorf("update error: %v", err)
+                               break
+                       }
+                       if w.handler != nil {
+                               if err := w.handler(sidecarConfig, 
valuesConfig); err != nil {
+                                       klog.Errorf("update error: %v", err)
+                               }
+                       }
+               case event, ok := <-w.watcher.Events:
+                       if !ok {
+                               return
+                       }
+                       klog.V(2).Infof("Injector watch update: %+v", event)
+                       // use a timer to debounce configuration updates
+                       if (event.Has(fsnotify.Write) || 
event.Has(fsnotify.Create)) && timerC == nil {
+                               timerC = time.After(watchDebounceDelay)
+                       }
+               case err, ok := <-w.watcher.Errors:
+                       if !ok {
+                               return
+                       }
+                       klog.Errorf("Watcher error: %v", err)
+               case <-stop:
+                       return
+               }
+       }
+}
+
+func (w *fileWatcher) SetHandler(handler func(*Config, string) error) {
+       w.handler = handler
+}
+
+func (w *fileWatcher) Get() (*Config, string, error) {
+       return loadConfig(w.configFile, w.valuesFile)
+}
+
+func (w *configMapWatcher) SetHandler(handler func(*Config, string) error) {
+       w.handler = handler
+}
+
+func (w *configMapWatcher) Run(stop <-chan struct{}) {
+       w.c.Run(stop)
+}
+
+func (w *configMapWatcher) Get() (*Config, string, error) {
+       cms := w.client.Kube().CoreV1().ConfigMaps(w.namespace)
+       cm, err := cms.Get(context.TODO(), w.name, metav1.GetOptions{})
+       if err != nil {
+               return nil, "", err
+       }
+       return readConfigMap(cm, w.configKey, w.valuesKey)
+}
+
+func NewConfigMapWatcher(client kube.Client, namespace, name, configKey, 
valuesKey string) Watcher {
+       w := &configMapWatcher{
+               client:    client,
+               namespace: namespace,
+               name:      name,
+               configKey: configKey,
+               valuesKey: valuesKey,
+       }
+       w.c = configmapwatcher.NewController(client, namespace, name, func(cm 
*v1.ConfigMap) {
+               sidecarConfig, valuesConfig, err := readConfigMap(cm, 
configKey, valuesKey)
+               if err != nil {
+                       klog.Warningf("failed to read injection config from 
ConfigMap: %v", err)
+                       return
+               }
+               if w.handler != nil {
+                       if err := w.handler(sidecarConfig, valuesConfig); err 
!= nil {
+                               klog.Errorf("update error: %v", err)
+                       }
+               }
+       })
+       return w
+}
+
+func readConfigMap(cm *v1.ConfigMap, configKey, valuesKey string) (*Config, 
string, error) {
+       if cm == nil {
+               return nil, "", fmt.Errorf("no ConfigMap found")
+       }
+
+       configYaml, exists := cm.Data[configKey]
+       if !exists {
+               return nil, "", fmt.Errorf("missing ConfigMap config key %q", 
configKey)
+       }
+       c, err := unmarshalConfig([]byte(configYaml))
+       if err != nil {
+               return nil, "", fmt.Errorf("failed reading config: %v. 
YAML:\n%s", err, configYaml)
+       }
+
+       valuesConfig, exists := cm.Data[valuesKey]
+       if !exists {
+               return nil, "", fmt.Errorf("missing ConfigMap values key %q", 
valuesKey)
+       }
+       return c, valuesConfig, nil
+}
diff --git a/pkg/kube/inject/webhook.go b/pkg/kube/inject/webhook.go
index 5f8858f9..372ea741 100644
--- a/pkg/kube/inject/webhook.go
+++ b/pkg/kube/inject/webhook.go
@@ -1,4 +1,400 @@
 package inject
 
+import (
+       "encoding/json"
+       "errors"
+       "fmt"
+       opconfig "github.com/apache/dubbo-kubernetes/operator/pkg/apis"
+       "github.com/apache/dubbo-kubernetes/pkg/kube"
+       "github.com/apache/dubbo-kubernetes/pkg/util/protomarshal"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+       "gomodules.xyz/jsonpatch/v2"
+       meshconfig "istio.io/api/mesh/v1alpha1"
+       admissionv1 "k8s.io/api/admission/v1"
+       kubeApiAdmissionv1beta1 "k8s.io/api/admission/v1beta1"
+       corev1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/runtime/serializer"
+       "k8s.io/apimachinery/pkg/types"
+       "k8s.io/klog/v2"
+       "net/http"
+       "os"
+       "sigs.k8s.io/yaml"
+       "strings"
+       "sync"
+       "text/template"
+       "time"
+)
+
+var (
+       runtimeScheme = runtime.NewScheme()
+       codecs        = serializer.NewCodecFactory(runtimeScheme)
+       deserializer  = codecs.UniversalDeserializer()
+)
+
+func init() {
+       _ = corev1.AddToScheme(runtimeScheme)
+       _ = admissionv1.AddToScheme(runtimeScheme)
+       _ = kubeApiAdmissionv1beta1.AddToScheme(runtimeScheme)
+}
+
+const (
+       watchDebounceDelay = 100 * time.Millisecond
+)
+
 type Webhook struct {
+       mu           sync.RWMutex
+       watcher      Watcher
+       meshConfig   *meshconfig.MeshConfig
+       env          *model.Environment
+       Config       *Config
+       valuesConfig ValuesConfig
+}
+
+func NewWebhook(p WebhookParameters) (*Webhook, error) {
+       if p.Mux == nil {
+               return nil, errors.New("expected mux to be passed, but was not 
passed")
+       }
+
+       wh := &Webhook{
+               watcher:    p.Watcher,
+               meshConfig: p.Env.Mesh(),
+               env:        p.Env,
+       }
+
+       sidecarConfig, valuesConfig, err := p.Watcher.Get()
+       if err != nil {
+               return nil, fmt.Errorf("failed to get initial configuration: 
%v", err)
+       }
+       if err := wh.updateConfig(sidecarConfig, valuesConfig); err != nil {
+               return nil, fmt.Errorf("failed to process webhook config: %v", 
err)
+       }
+
+       p.Mux.HandleFunc("/inject", wh.serveInject)
+       p.Mux.HandleFunc("/inject/", wh.serveInject)
+
+       p.Env.Watcher.AddMeshHandler(func() {
+               wh.mu.Lock()
+               wh.meshConfig = p.Env.Mesh()
+               wh.mu.Unlock()
+       })
+
+       return wh, nil
+}
+
+type WebhookParameters struct {
+       Watcher Watcher
+       Port    int
+       Env     *model.Environment
+       Mux     *http.ServeMux
+}
+
+type ValuesConfig struct {
+       raw      string
+       asStruct *opconfig.Values
+       asMap    map[string]any
+}
+
+func loadConfig(injectFile, valuesFile string) (*Config, string, error) {
+       data, err := os.ReadFile(injectFile)
+       if err != nil {
+               return nil, "", err
+       }
+       var c *Config
+       if c, err = unmarshalConfig(data); err != nil {
+               klog.Warningf("Failed to parse injectFile %s", string(data))
+               return nil, "", err
+       }
+
+       valuesConfig, err := os.ReadFile(valuesFile)
+       if err != nil {
+               return nil, "", err
+       }
+       return c, string(valuesConfig), nil
+}
+
+func unmarshalConfig(data []byte) (*Config, error) {
+       c, err := UnmarshalConfig(data)
+       if err != nil {
+               return nil, err
+       }
+       return &c, nil
+}
+
+func (wh *Webhook) updateConfig(sidecarConfig *Config, valuesConfig string) 
error {
+       wh.mu.Lock()
+       defer wh.mu.Unlock()
+       wh.Config = sidecarConfig
+       vc, err := NewValuesConfig(valuesConfig)
+       if err != nil {
+               return fmt.Errorf("failed to create new values config: %v", err)
+       }
+       wh.valuesConfig = vc
+       return nil
+}
+
+func NewValuesConfig(v string) (ValuesConfig, error) {
+       c := ValuesConfig{raw: v}
+       valuesStruct := &opconfig.Values{}
+       if err := protomarshal.ApplyYAML(v, valuesStruct); err != nil {
+               return c, fmt.Errorf("could not parse configuration values: 
%v", err)
+       }
+       c.asStruct = valuesStruct
+
+       values := map[string]any{}
+       if err := yaml.Unmarshal([]byte(v), &values); err != nil {
+               return c, fmt.Errorf("could not parse configuration values: 
%v", err)
+       }
+       c.asMap = values
+       return c, nil
+}
+
+func (wh *Webhook) serveInject(w http.ResponseWriter, r *http.Request) {
+       var body []byte
+       if r.Body != nil {
+               if data, err := kube.HTTPConfigReader(r); err == nil {
+                       body = data
+               } else {
+                       http.Error(w, err.Error(), http.StatusBadRequest)
+                       return
+               }
+       }
+       if len(body) == 0 {
+               http.Error(w, "no body found", http.StatusBadRequest)
+               return
+       }
+
+       // verify the content type is accurate
+       contentType := r.Header.Get("Content-Type")
+       if contentType != "application/json" {
+               http.Error(w, "invalid Content-Type, want `application/json`", 
http.StatusUnsupportedMediaType)
+               return
+       }
+
+       path := ""
+       if r.URL != nil {
+               path = r.URL.Path
+       }
+
+       var reviewResponse *kube.AdmissionResponse
+       var obj runtime.Object
+       var ar *kube.AdmissionReview
+       if out, _, err := deserializer.Decode(body, nil, obj); err != nil {
+               reviewResponse = toAdmissionResponse(err)
+       } else {
+               ar, err = kube.AdmissionReviewKubeToAdapter(out)
+               if err != nil {
+                       reviewResponse = toAdmissionResponse(err)
+               } else {
+                       reviewResponse = wh.inject(ar, path)
+               }
+       }
+
+       response := kube.AdmissionReview{}
+       response.Response = reviewResponse
+       var responseKube runtime.Object
+       var apiVersion string
+       if ar != nil {
+               apiVersion = ar.APIVersion
+               response.TypeMeta = ar.TypeMeta
+               if response.Response != nil {
+                       if ar.Request != nil {
+                               response.Response.UID = ar.Request.UID
+                       }
+               }
+       }
+       responseKube = kube.AdmissionReviewAdapterToKube(&response, apiVersion)
+       resp, err := json.Marshal(responseKube)
+       if err != nil {
+               http.Error(w, fmt.Sprintf("could not encode response: %v", 
err), http.StatusInternalServerError)
+               return
+       }
+       if _, err := w.Write(resp); err != nil {
+               klog.Errorf("Could not write response: %v", err)
+               http.Error(w, fmt.Sprintf("could not write response: %v", err), 
http.StatusInternalServerError)
+               return
+       }
+}
+
+type InjectionParameters struct {
+       pod                 *corev1.Pod
+       deployMeta          types.NamespacedName
+       namespace           *corev1.Namespace
+       nativeSidecar       bool
+       typeMeta            metav1.TypeMeta
+       templates           map[string]*template.Template
+       defaultTemplate     []string
+       aliases             map[string][]string
+       meshConfig          *meshconfig.MeshConfig
+       proxyConfig         *meshconfig.ProxyConfig
+       valuesConfig        ValuesConfig
+       revision            string
+       proxyEnvs           map[string]string
+       injectedAnnotations map[string]string
+}
+
+func injectPod(req InjectionParameters) ([]byte, error) {
+       // The patch will be built relative to the initial pod, capture its 
current state
+       originalPodSpec, err := json.Marshal(req.pod)
+       if err != nil {
+               return nil, err
+       }
+
+       // Run the injection template, giving us a partial pod spec
+       mergedPod, injectedPodData, err := RunTemplate(req)
+       if err != nil {
+               return nil, fmt.Errorf("failed to run injection template: %v", 
err)
+       }
+
+       mergedPod, err = reapplyOverwrittenContainers(mergedPod, req.pod, 
injectedPodData, req.proxyConfig)
+       if err != nil {
+               return nil, fmt.Errorf("failed to re apply container: %v", err)
+       }
+
+       // Apply some additional transformations to the pod
+       if err := postProcessPod(mergedPod, *injectedPodData, req); err != nil {
+               return nil, fmt.Errorf("failed to process pod: %v", err)
+       }
+
+       patch, err := createPatch(mergedPod, originalPodSpec)
+       if err != nil {
+               return nil, fmt.Errorf("failed to create patch: %v", err)
+       }
+
+       return patch, nil
+}
+
+func reapplyOverwrittenContainers(finalPod *corev1.Pod, originalPod 
*corev1.Pod, templatePod *corev1.Pod, proxyConfig *meshconfig.ProxyConfig) 
(*corev1.Pod, error) {
+       return finalPod, nil
+}
+
+func createPatch(pod *corev1.Pod, original []byte) ([]byte, error) {
+       reinjected, err := json.Marshal(pod)
+       if err != nil {
+               return nil, err
+       }
+       p, err := jsonpatch.CreatePatch(original, reinjected)
+       if err != nil {
+               return nil, err
+       }
+       return json.Marshal(p)
+}
+
+func postProcessPod(pod *corev1.Pod, injectedPod corev1.Pod, req 
InjectionParameters) error {
+       if pod.Annotations == nil {
+               pod.Annotations = map[string]string{}
+       }
+       if pod.Labels == nil {
+               pod.Labels = map[string]string{}
+       }
+       return nil
+}
+
+func parseInjectEnvs(path string) map[string]string {
+       path = strings.TrimSuffix(path, "/")
+       res := func(path string) []string {
+               parts := strings.SplitN(path, "/", 3)
+               var newRes []string
+               if len(parts) == 3 { // If length is less than 3, then the path 
is simply "/inject".
+                       if strings.HasPrefix(parts[2], ":ENV:") {
+                               // Deprecated, not recommended.
+                               //    Note that this syntax fails validation 
when used to set injectionPath (i.e., service.path in mwh).
+                               //    It doesn't fail validation when used to 
set injectionURL, however. K8s bug maybe?
+                               pairs := strings.Split(parts[2], ":ENV:")
+                               for i := 1; i < len(pairs); i++ { // skip the 
first part, it is a nil
+                                       pair := strings.SplitN(pairs[i], "=", 2)
+                                       // The first part is the variable name 
which can not be empty
+                                       // the second part is the variable 
value which can be empty but has to exist
+                                       // for example, aaa=bbb, aaa= are 
valid, but =aaa or = are not valid, the
+                                       // invalid ones will be ignored.
+                                       if len(pair[0]) > 0 && len(pair) == 2 {
+                                               newRes = append(newRes, pair...)
+                                       }
+                               }
+                               return newRes
+                       }
+                       newRes = strings.Split(parts[2], "/")
+               }
+               for i, value := range newRes {
+                       if i%2 != 0 {
+                               // Replace --slash-- with / in values.
+                               newRes[i] = strings.ReplaceAll(value, 
"--slash--", "/")
+                       }
+               }
+               return newRes
+       }(path)
+       newEnvs := make(map[string]string)
+
+       for i := 0; i < len(res); i += 2 {
+               k := res[i]
+               if i == len(res)-1 { // ignore the last key without value
+                       klog.Warningf("Odd number of inject env entries, ignore 
the last key %s\n", k)
+                       break
+               }
+       }
+
+       return newEnvs
+}
+
+func (wh *Webhook) inject(ar *kube.AdmissionReview, path string) 
*kube.AdmissionResponse {
+       req := ar.Request
+       var pod corev1.Pod
+       if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
+               return toAdmissionResponse(err)
+       }
+
+       pod.ManagedFields = nil
+
+       klog.Infof("Process proxyless injection request")
+
+       proxyConfig := wh.env.GetProxyConfigOrDefault(pod.Namespace, 
pod.Labels, pod.Annotations, wh.meshConfig)
+       deploy, typeMeta := kube.GetDeployMetaFromPod(&pod)
+       params := InjectionParameters{
+               pod:                 &pod,
+               deployMeta:          deploy,
+               typeMeta:            typeMeta,
+               templates:           wh.Config.Templates,
+               defaultTemplate:     wh.Config.DefaultTemplates,
+               aliases:             wh.Config.Aliases,
+               meshConfig:          wh.meshConfig,
+               proxyConfig:         proxyConfig,
+               valuesConfig:        wh.valuesConfig,
+               injectedAnnotations: wh.Config.InjectedAnnotations,
+               proxyEnvs:           parseInjectEnvs(path),
+       }
+
+       patchBytes, err := injectPod(params)
+       if err != nil {
+               return toAdmissionResponse(err)
+       }
+       reviewResponse := kube.AdmissionResponse{
+               Allowed: true,
+               Patch:   patchBytes,
+               PatchType: func() *string {
+                       pt := "JSONPatch"
+                       return &pt
+               }(),
+       }
+       return &reviewResponse
+}
+
+func (wh *Webhook) Run(stop <-chan struct{}) {
+       go wh.watcher.Run(stop)
+}
+
+func ParseTemplates(tmpls RawTemplates) (Templates, error) {
+       ret := make(Templates, len(tmpls))
+       for k, t := range tmpls {
+               p, err := parseDryTemplate(t, InjectionFuncmap)
+               if err != nil {
+                       return nil, err
+               }
+               ret[k] = p
+       }
+       return ret, nil
+}
+
+func toAdmissionResponse(err error) *kube.AdmissionResponse {
+       return &kube.AdmissionResponse{Result: &metav1.Status{Message: 
err.Error()}}
 }
diff --git a/pkg/kube/util.go b/pkg/kube/util.go
index 6491fac0..10309ed3 100644
--- a/pkg/kube/util.go
+++ b/pkg/kube/util.go
@@ -18,15 +18,25 @@
 package kube
 
 import (
+       "fmt"
        "github.com/apache/dubbo-kubernetes/sail/pkg/features"
+       "io"
        corev1 "k8s.io/api/core/v1"
+       "k8s.io/apimachinery/pkg/api/errors"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/apimachinery/pkg/runtime/serializer"
+       "k8s.io/apimachinery/pkg/types"
        "k8s.io/client-go/rest"
        "k8s.io/client-go/tools/clientcmd"
+       "net/http"
        "os"
+       "regexp"
+       "strings"
 )
 
+var cronJobNameRegexp = regexp.MustCompile(`(.+)-\d{8,10}$`)
+
 func DefaultRestConfig(kubeconfig, configContext string, fns ...func(config 
*rest.Config)) (*rest.Config, error) {
        config, err := BuildClientConfig(kubeconfig, configContext)
        if err != nil {
@@ -90,3 +100,95 @@ func SetRestDefaults(config *rest.Config) *rest.Config {
        }
        return config
 }
+
+const MaxRequestBodyBytes = int64(6 * 1024 * 1024)
+
+func HTTPConfigReader(req *http.Request) ([]byte, error) {
+       defer req.Body.Close()
+       lr := &io.LimitedReader{
+               R: req.Body,
+               N: MaxRequestBodyBytes + 1,
+       }
+       data, err := io.ReadAll(lr)
+       if err != nil {
+               return nil, err
+       }
+       if lr.N <= 0 {
+               return nil, 
errors.NewRequestEntityTooLargeError(fmt.Sprintf("limit is %d", 
MaxRequestBodyBytes))
+       }
+       return data, nil
+}
+
+func GetDeployMetaFromPod(pod *corev1.Pod) (types.NamespacedName, 
metav1.TypeMeta) {
+       if pod == nil {
+               return types.NamespacedName{}, metav1.TypeMeta{}
+       }
+       // try to capture more useful namespace/name info for deployments, etc.
+       // TODO(dougreid): expand to enable lookup of OWNERs recursively a la 
kubernetesenv
+
+       deployMeta := types.NamespacedName{Name: pod.Name, Namespace: 
pod.Namespace}
+
+       typeMetadata := metav1.TypeMeta{
+               Kind:       "Pod",
+               APIVersion: "v1",
+       }
+       if len(pod.GenerateName) > 0 {
+               // if the pod name was generated (or is scheduled for 
generation), we can begin an investigation into the controlling reference for 
the pod.
+               var controllerRef metav1.OwnerReference
+               controllerFound := false
+               for _, ref := range pod.GetOwnerReferences() {
+                       if ref.Controller != nil && *ref.Controller {
+                               controllerRef = ref
+                               controllerFound = true
+                               break
+                       }
+               }
+               if controllerFound {
+                       typeMetadata.APIVersion = controllerRef.APIVersion
+                       typeMetadata.Kind = controllerRef.Kind
+
+                       // heuristic for deployment detection
+                       deployMeta.Name = controllerRef.Name
+                       if typeMetadata.Kind == "ReplicaSet" && 
pod.Labels["pod-template-hash"] != "" && strings.HasSuffix(controllerRef.Name, 
pod.Labels["pod-template-hash"]) {
+                               name := strings.TrimSuffix(controllerRef.Name, 
"-"+pod.Labels["pod-template-hash"])
+                               deployMeta.Name = name
+                               typeMetadata.Kind = "Deployment"
+                       } else if typeMetadata.Kind == "ReplicaSet" && 
pod.Labels["rollouts-pod-template-hash"] != "" &&
+                               strings.HasSuffix(controllerRef.Name, 
pod.Labels["rollouts-pod-template-hash"]) {
+                               // Heuristic for ArgoCD Rollout
+                               name := strings.TrimSuffix(controllerRef.Name, 
"-"+pod.Labels["rollouts-pod-template-hash"])
+                               deployMeta.Name = name
+                               typeMetadata.Kind = "Rollout"
+                               typeMetadata.APIVersion = "v1alpha1"
+                       } else if typeMetadata.Kind == "ReplicationController" 
&& pod.Labels["deploymentconfig"] != "" {
+                               // If the pod is controlled by the replication 
controller, which is created by the DeploymentConfig resource in
+                               // Openshift platform, set the deploy name to 
the deployment config's name, and the kind to 'DeploymentConfig'.
+                               //
+                               // nolint: lll
+                               // For DeploymentConfig details, refer to
+                               // 
https://docs.openshift.com/container-platform/4.1/applications/deployments/what-deployments-are.html#deployments-and-deploymentconfigs_what-deployments-are
+                               //
+                               // For the reference to the pod label 
'deploymentconfig', refer to
+                               // 
https://github.com/openshift/library-go/blob/7a65fdb398e28782ee1650959a5e0419121e97ae/pkg/apps/appsutil/const.go#L25
+                               deployMeta.Name = pod.Labels["deploymentconfig"]
+                               typeMetadata.Kind = "DeploymentConfig"
+                       } else if typeMetadata.Kind == "Job" {
+                               // If job name suffixed with 
`-<digit-timestamp>`, where the length of digit timestamp is 8~10,
+                               // trim the suffix and set kind to cron job.
+                               if jn := 
cronJobNameRegexp.FindStringSubmatch(controllerRef.Name); len(jn) == 2 {
+                                       deployMeta.Name = jn[1]
+                                       typeMetadata.Kind = "CronJob"
+                                       // heuristically set cron job api 
version to v1 as it cannot be derived from pod metadata.
+                                       typeMetadata.APIVersion = "batch/v1"
+                               }
+                       }
+               }
+       }
+
+       if deployMeta.Name == "" {
+               // if we haven't been able to extract a deployment name, then 
just give it the pod name
+               deployMeta.Name = pod.Name
+       }
+
+       return deployMeta, typeMetadata
+}
diff --git a/pkg/kube/watcher/configmapwatcher/configmapwatcher.go 
b/pkg/kube/watcher/configmapwatcher/configmapwatcher.go
new file mode 100644
index 00000000..3225f61e
--- /dev/null
+++ b/pkg/kube/watcher/configmapwatcher/configmapwatcher.go
@@ -0,0 +1,64 @@
+package configmapwatcher
+
+import (
+       "github.com/apache/dubbo-kubernetes/pkg/kube"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
+       "go.uber.org/atomic"
+       v1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/fields"
+       "k8s.io/apimachinery/pkg/types"
+)
+
+type Controller struct {
+       configmaps kclient.Client[*v1.ConfigMap]
+       queue      controllers.Queue
+
+       configMapNamespace string
+       configMapName      string
+       callback           func(*v1.ConfigMap)
+
+       hasSynced atomic.Bool
+}
+
+func NewController(client kube.Client, namespace, name string, callback 
func(*v1.ConfigMap)) *Controller {
+       c := &Controller{
+               configMapNamespace: namespace,
+               configMapName:      name,
+               callback:           callback,
+       }
+
+       c.configmaps = kclient.NewFiltered[*v1.ConfigMap](client, 
kclient.Filter{
+               Namespace:     namespace,
+               FieldSelector: 
fields.OneTermEqualSelector(metav1.ObjectNameField, name).String(),
+       })
+
+       c.queue = controllers.NewQueue("configmap "+name, 
controllers.WithReconciler(c.processItem))
+       
c.configmaps.AddEventHandler(controllers.FilteredObjectSpecHandler(c.queue.AddObject,
 func(o controllers.Object) bool {
+               // Filter out other configmaps
+               return o.GetName() == name && o.GetNamespace() == namespace
+       }))
+
+       return c
+}
+
+func (c *Controller) processItem(name types.NamespacedName) error {
+       cm := c.configmaps.Get(name.Name, name.Namespace)
+       c.callback(cm)
+
+       c.hasSynced.Store(true)
+       return nil
+}
+
+func (c *Controller) Run(stop <-chan struct{}) {
+       // Start informer immediately instead of with the rest. This is because 
we use configmapwatcher for
+       // single types (so its never shared), and for use cases where we need 
the results immediately
+       // during startup.
+       c.configmaps.Start(stop)
+       if !kube.WaitForCacheSync("configmap "+c.configMapName, stop, 
c.configmaps.HasSynced) {
+               c.queue.ShutDownEarly()
+               return
+       }
+       c.queue.Run(stop)
+}
diff --git a/pkg/webhooks/util/util.go b/pkg/webhooks/util/util.go
new file mode 100644
index 00000000..bdd099ec
--- /dev/null
+++ b/pkg/webhooks/util/util.go
@@ -0,0 +1,45 @@
+package util
+
+import (
+       "crypto/x509"
+       "encoding/pem"
+       "errors"
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/keycertbundle"
+)
+
+type ConfigError struct {
+       err    error
+       reason string
+}
+
+func (e ConfigError) Error() string {
+       return e.err.Error()
+}
+
+func (e ConfigError) Reason() string {
+       return e.reason
+}
+
+func LoadCABundle(caBundleWatcher *keycertbundle.Watcher) ([]byte, error) {
+       caBundle := caBundleWatcher.GetCABundle()
+       if err := VerifyCABundle(caBundle); err != nil {
+               return nil, &ConfigError{err, "could not verify caBundle"}
+       }
+
+       return caBundle, nil
+}
+
+func VerifyCABundle(caBundle []byte) error {
+       block, _ := pem.Decode(caBundle)
+       if block == nil {
+               return errors.New("could not decode pem")
+       }
+       if block.Type != "CERTIFICATE" {
+               return fmt.Errorf("cert contains wrong pem type: %q", 
block.Type)
+       }
+       if _, err := x509.ParseCertificate(block.Bytes); err != nil {
+               return fmt.Errorf("cert contains invalid x509 certificate: %v", 
err)
+       }
+       return nil
+}
diff --git a/pkg/webhooks/webhookpatch.go b/pkg/webhooks/webhookpatch.go
new file mode 100644
index 00000000..71c7b7b4
--- /dev/null
+++ b/pkg/webhooks/webhookpatch.go
@@ -0,0 +1,142 @@
+package webhooks
+
+import (
+       "bytes"
+       "errors"
+       kubelib "github.com/apache/dubbo-kubernetes/pkg/kube"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
+       "github.com/apache/dubbo-kubernetes/pkg/webhooks/util"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/keycertbundle"
+       "istio.io/api/label"
+       v1 "k8s.io/api/admissionregistration/v1"
+       kerrors "k8s.io/apimachinery/pkg/api/errors"
+       klabels "k8s.io/apimachinery/pkg/labels"
+       "k8s.io/apimachinery/pkg/types"
+       "k8s.io/client-go/util/workqueue"
+       "k8s.io/klog/v2"
+       "math"
+       "strings"
+       "time"
+)
+
+var (
+       errWrongRevision     = errors.New("webhook does not belong to target 
revision")
+       errNotFound          = errors.New("webhook not found")
+       errNoWebhookWithName = errors.New("webhook configuration did not 
contain webhook with target name")
+)
+
+type WebhookCertPatcher struct {
+       // revision to patch webhooks for
+       revision    string
+       webhookName string
+
+       queue controllers.Queue
+
+       // File path to the x509 certificate bundle used by the webhook server
+       // and patched into the webhook config.
+       CABundleWatcher *keycertbundle.Watcher
+
+       webhooks kclient.Client[*v1.MutatingWebhookConfiguration]
+}
+
+func NewWebhookCertPatcher(client kubelib.Client, webhookName string, 
caBundleWatcher *keycertbundle.Watcher) (*WebhookCertPatcher, error) {
+       p := &WebhookCertPatcher{
+               webhookName:     webhookName,
+               CABundleWatcher: caBundleWatcher,
+       }
+       p.queue = newWebhookPatcherQueue(p.webhookPatchTask)
+
+       p.webhooks = kclient.New[*v1.MutatingWebhookConfiguration](client)
+       p.webhooks.AddEventHandler(controllers.ObjectHandler(p.queue.AddObject))
+
+       return p, nil
+}
+
+func newWebhookPatcherQueue(reconciler controllers.ReconcilerFn) 
controllers.Queue {
+       return controllers.NewQueue("webhook patcher",
+               controllers.WithReconciler(reconciler),
+               
controllers.WithRateLimiter(workqueue.NewTypedItemFastSlowRateLimiter[any](100*time.Millisecond,
 1*time.Minute, 5)),
+               controllers.WithMaxAttempts(math.MaxInt))
+}
+
+func (w *WebhookCertPatcher) webhookPatchTask(o types.NamespacedName) error {
+       err := w.patchMutatingWebhookConfig(o.Name)
+
+       // do not want to retry the task if these errors occur, they indicate 
that
+       // we should no longer be patching the given webhook
+       if kerrors.IsNotFound(err) || errors.Is(err, errWrongRevision) || 
errors.Is(err, errNoWebhookWithName) || errors.Is(err, errNotFound) {
+               return nil
+       }
+
+       if err != nil {
+               klog.Errorf("patching webhook %s failed: %v", o.Name, err)
+       }
+
+       return err
+}
+
+func (w *WebhookCertPatcher) patchMutatingWebhookConfig(webhookConfigName 
string) error {
+       config := w.webhooks.Get(webhookConfigName, "")
+       if config == nil {
+               return errNotFound
+       }
+       // prevents a race condition between multiple istiods when the revision 
is changed or modified
+       v, ok := config.Labels[label.IoIstioRev.Name]
+       if !ok {
+               return nil
+       }
+       if v != w.revision {
+               return errWrongRevision
+       }
+
+       found := false
+       updated := false
+       caCertPem, err := util.LoadCABundle(w.CABundleWatcher)
+       if err != nil {
+               klog.Errorf("Failed to load CA bundle: %v", err)
+               return err
+       }
+       for i, wh := range config.Webhooks {
+               if strings.HasSuffix(wh.Name, w.webhookName) {
+                       if !bytes.Equal(caCertPem, 
config.Webhooks[i].ClientConfig.CABundle) {
+                               updated = true
+                       }
+                       config.Webhooks[i].ClientConfig.CABundle = caCertPem
+                       found = true
+               }
+       }
+       if !found {
+               return errNoWebhookWithName
+       }
+
+       if updated {
+               _, err := w.webhooks.Update(config)
+               if err != nil {
+               }
+       }
+
+       return err
+}
+
+func (w *WebhookCertPatcher) Run(stopChan <-chan struct{}) {
+       go w.startCaBundleWatcher(stopChan)
+       w.webhooks.Start(stopChan)
+       kubelib.WaitForCacheSync("webhook patcher", stopChan, 
w.webhooks.HasSynced)
+       w.queue.Run(stopChan)
+}
+
+func (w *WebhookCertPatcher) startCaBundleWatcher(stop <-chan struct{}) {
+       id, watchCh := w.CABundleWatcher.AddWatcher()
+       defer w.CABundleWatcher.RemoveWatcher(id)
+       for {
+               select {
+               case <-watchCh:
+                       for _, whc := range w.webhooks.List("", 
klabels.Everything()) {
+                               w.queue.AddObject(whc)
+                       }
+               case <-stop:
+                       return
+               }
+       }
+}
diff --git a/sail/cmd/sail-discovery/app/cmd.go 
b/sail/cmd/sail-discovery/app/cmd.go
index 12eb57bb..2b0f24dd 100644
--- a/sail/cmd/sail-discovery/app/cmd.go
+++ b/sail/cmd/sail-discovery/app/cmd.go
@@ -89,6 +89,9 @@ func newDiscoveryCommand() *cobra.Command {
 func addFlags(c *cobra.Command) {
        serverArgs = bootstrap.NewSailArgs(func(p *bootstrap.SailArgs) {
                p.CtrlZOptions = ctrlz.DefaultOptions()
+               p.InjectionOptions = bootstrap.InjectionOptions{
+                       InjectionDirectory: "./var/lib/dubbo/inject",
+               }
        })
        
c.PersistentFlags().StringSliceVar(&serverArgs.RegistryOptions.Registries, 
"registries",
                []string{string(provider.Kubernetes)},
diff --git a/sail/pkg/bootstrap/options.go b/sail/pkg/bootstrap/options.go
index e4facdc9..d262d4b3 100644
--- a/sail/pkg/bootstrap/options.go
+++ b/sail/pkg/bootstrap/options.go
@@ -39,9 +39,15 @@ type RegistryOptions struct {
        ClusterRegistriesNamespace string
 }
 
+type InjectionOptions struct {
+       // Directory of injection related config files.
+       InjectionDirectory string
+}
+
 type SailArgs struct {
        ServerOptions      DiscoveryServerOptions
        RegistryOptions    RegistryOptions
+       InjectionOptions   InjectionOptions
        MeshConfigFile     string
        NetworksConfigFile string
        PodName            string
diff --git a/sail/pkg/bootstrap/proxylessinjector.go 
b/sail/pkg/bootstrap/proxylessinjector.go
new file mode 100644
index 00000000..6c35b198
--- /dev/null
+++ b/sail/pkg/bootstrap/proxylessinjector.go
@@ -0,0 +1,100 @@
+package bootstrap
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/env"
+       "github.com/apache/dubbo-kubernetes/pkg/kube/inject"
+       "github.com/apache/dubbo-kubernetes/pkg/webhooks"
+       "github.com/apache/dubbo-kubernetes/sail/pkg/features"
+       "k8s.io/apimachinery/pkg/api/errors"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/klog/v2"
+       "os"
+       "path/filepath"
+)
+
+const (
+       webhookName                  = "proxyless-injector.dubbo.io"
+       defaultInjectorConfigMapName = "dubbo-proxyless-injector"
+)
+
+var injectionEnabled = env.Register("INJECT_ENABLED", true, "Enable mutating 
webhook handler.")
+
+func (s *Server) initProxylessInjector(args *SailArgs) (*inject.Webhook, 
error) {
+       // currently the constant: "./var/lib/dubbo/inject"
+       injectPath := args.InjectionOptions.InjectionDirectory
+       if injectPath == "" || !injectionEnabled.Get() {
+               klog.Infof("Skipping proxyless injector, injection path is 
missing or disabled.")
+               return nil, nil
+       }
+
+       // If the injection config exists either locally or remotely, we will 
set up injection.
+       var watcher inject.Watcher
+       if _, err := os.Stat(filepath.Join(injectPath, "config")); 
!os.IsNotExist(err) {
+               configFile := filepath.Join(injectPath, "config")
+               valuesFile := filepath.Join(injectPath, "values")
+               watcher, err = inject.NewFileWatcher(configFile, valuesFile)
+               if err != nil {
+                       return nil, err
+               }
+       } else if s.kubeClient != nil {
+               configMapName := getInjectorConfigMapName("")
+               cms := s.kubeClient.Kube().CoreV1().ConfigMaps(args.Namespace)
+               if _, err := cms.Get(context.TODO(), configMapName, 
metav1.GetOptions{}); err != nil {
+                       if errors.IsNotFound(err) {
+                               klog.Infof("Skipping proxyless injector, 
template not found")
+                               return nil, nil
+                       }
+                       return nil, err
+               }
+               watcher = inject.NewConfigMapWatcher(s.kubeClient, 
args.Namespace, configMapName, "config", "values")
+       } else {
+               klog.Infof("Skipping proxyless injector, template not found")
+               return nil, nil
+       }
+
+       klog.Info("initializing proxyless injector")
+
+       parameters := inject.WebhookParameters{
+               Watcher: watcher,
+               Env:     s.environment,
+               Mux:     s.httpsMux,
+       }
+
+       wh, err := inject.NewWebhook(parameters)
+       if err != nil {
+               return nil, fmt.Errorf("failed to create injection webhook: 
%v", err)
+       }
+       // Patch cert if a webhook config name is provided.
+       // This requires RBAC permissions - a low-priv Istiod should not 
attempt to patch but rely on
+       // operator or CI/CD
+       if features.InjectionWebhookConfigName != "" {
+               s.addStartFunc("injection patcher", func(stop <-chan struct{}) 
error {
+                       // No leader election - different istiod revisions will 
patch their own cert.
+                       // update webhook configuration by watching the cabundle
+                       patcher, err := 
webhooks.NewWebhookCertPatcher(s.kubeClient, webhookName, 
s.dubbodCertBundleWatcher)
+                       if err != nil {
+                               klog.Errorf("failed to create webhook cert 
patcher: %v", err)
+                               return nil
+                       }
+
+                       go patcher.Run(stop)
+                       return nil
+               })
+       }
+
+       s.addStartFunc("injection server", func(stop <-chan struct{}) error {
+               wh.Run(stop)
+               return nil
+       })
+       return wh, nil
+}
+
+func getInjectorConfigMapName(revision string) string {
+       name := defaultInjectorConfigMapName
+       if revision == "" || revision == "default" {
+               return name
+       }
+       return name + "-" + revision
+}
diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go
index 1a60f80d..5a427f50 100644
--- a/sail/pkg/bootstrap/server.go
+++ b/sail/pkg/bootstrap/server.go
@@ -50,6 +50,7 @@ import (
        caserver "github.com/apache/dubbo-kubernetes/security/pkg/server/ca"
        "github.com/fsnotify/fsnotify"
        grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
+       "go.uber.org/atomic"
        "golang.org/x/net/http2"
        "google.golang.org/grpc"
        "google.golang.org/grpc/credentials"
@@ -111,9 +112,16 @@ type Server struct {
 
        readinessProbes map[string]readinessProbe
 
+       readinessFlags *readinessFlags
+
        webhookInfo *webhookInfo
 }
 
+type readinessFlags struct {
+       proxylessInjectorReady atomic.Bool
+       configValidationReady  atomic.Bool
+}
+
 type webhookInfo struct {
        mu sync.RWMutex
        wh *inject.Webhook
@@ -139,6 +147,8 @@ func NewServer(args *SailArgs, initFuncs ...func(*Server)) 
(*Server, error) {
                dubbodCertBundleWatcher: keycertbundle.NewWatcher(),
                fileWatcher:             filewatcher.NewWatcher(),
                internalStop:            make(chan struct{}),
+               readinessFlags:          &readinessFlags{},
+               webhookInfo:             &webhookInfo{},
        }
        for _, fn := range initFuncs {
                fn(s)
@@ -216,6 +226,15 @@ func NewServer(args *SailArgs, initFuncs ...func(*Server)) 
(*Server, error) {
 
        if s.kubeClient != nil {
                s.initSecureWebhookServer(args)
+               wh, err := s.initProxylessInjector(args)
+               if err != nil {
+                       return nil, fmt.Errorf("error initializing proxyless 
injector: %v", err)
+               }
+               s.readinessFlags.proxylessInjectorReady.Store(true)
+               s.webhookInfo.mu.Lock()
+               s.webhookInfo.wh = wh
+               s.webhookInfo.mu.Unlock()
+
                if err := s.initConfigValidation(args); err != nil {
                        return nil, fmt.Errorf("error initializing config 
validator: %v", err)
                }
diff --git a/sail/pkg/features/sail.go b/sail/pkg/features/sail.go
index 08213a17..f0a22a5c 100644
--- a/sail/pkg/features/sail.go
+++ b/sail/pkg/features/sail.go
@@ -62,4 +62,6 @@ var (
        DubbodServiceCustomHost = env.Register("DUBBOD_CUSTOM_HOST", "",
                "Custom host name of dubbod that dubbod signs the server cert. 
"+
                        "Multiple custom host names are supported, and multiple 
values are separated by commas.").Get()
+       InjectionWebhookConfigName = 
env.Register("INJECTION_WEBHOOK_CONFIG_NAME", "istio-sidecar-injector",
+               "Name of the mutatingwebhookconfiguration to patch, if istioctl 
is not used.").Get()
 )
diff --git a/sail/pkg/model/context.go b/sail/pkg/model/context.go
index 98c0bd75..35e39be1 100644
--- a/sail/pkg/model/context.go
+++ b/sail/pkg/model/context.go
@@ -100,6 +100,10 @@ func (e *Environment) GetDiscoveryAddress() (host.Name, 
string, error) {
        return host.Name(hostname), port, nil
 }
 
+func (e *Environment) GetProxyConfigOrDefault(ns string, labels, annotations 
map[string]string, meshConfig *meshconfig.MeshConfig) *meshconfig.ProxyConfig {
+       return mesh.DefaultProxyConfig()
+}
+
 func (e *Environment) ClusterLocal() ClusterLocalProvider {
        return e.clusterLocalServices
 }

Reply via email to