This is an automated email from the ASF dual-hosted git repository.

zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git


The following commit(s) were added to refs/heads/master by this push:
     new 5db06b24 [security] add aegis corecode (#796)
5db06b24 is described below

commit 5db06b247bb2dd54834b9061571c8d0bdab478d2
Author: Jian Zhong <[email protected]>
AuthorDate: Tue Sep 30 00:55:59 2025 +0800

    [security] add aegis corecode (#796)
---
 .licenserc.yaml                                    |  23 ----
 go.mod                                             |   4 +-
 go.sum                                             |   4 +-
 pkg/cmd/cmd.go                                     |   7 ++
 pkg/kube/client.go                                 |  82 ++++++++++++++-
 pkg/security/security.go                           |  58 ++++++++--
 sail/cmd/sail-discovery/app/cmd.go                 |   3 +
 sail/pkg/bootstrap/ca.go                           |  83 +++++++++++++++
 sail/pkg/bootstrap/options.go                      |   1 +
 sail/pkg/bootstrap/server.go                       |  61 +++++++++++
 sail/pkg/bootstrap/validation.go                   |  13 +++
 sail/pkg/bootstrap/webhook.go                      |  52 +++++++++
 sail/pkg/model/xds_cache.go                        |   9 +-
 sail/pkg/security/model/authentication.go          |   5 +
 sail/pkg/xds/discovery.go                          |  60 ++++++++---
 .../server/ca/authenticate/cert_authenticator.go   |  85 +++++++++++++++
 security/pkg/server/ca/authenticate/oidc.go        | 117 +++++++++++++++++++++
 security/pkg/server/ca/server.go                   |  30 ++++++
 security/pkg/util/jwtutil.go                       |  14 +++
 19 files changed, 661 insertions(+), 50 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index 3de0b608..4ca86d88 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -24,8 +24,6 @@ header:
     - '**/.settings/*'
     - '**/.classpath'
     - '**/.project'
-    - '**/target/**'
-    - '**/generated/**'
     - '**/*.log'
     - '**/codestyle/*'
     - '**/resources/META-INF/**'
@@ -36,9 +34,7 @@ header:
     - '**/*.load'
     - '**/*.flex'
     - '**/*.fc'
-    - '**/*.javascript'
     - '**/*.properties'
-    - '**/*.thrift'
     - '**/*.pb.go'
     - '**/*.sh'
     - '**/*.bat'
@@ -54,7 +50,6 @@ header:
     - '**/*.schemas'
     - '**/*.nojekyll'
     - '**/.browserslistrc'
-    - '**/yarn.lock'
     - '.git/'
     - '.github/**'
     - '**/.gitignore'
@@ -64,33 +59,15 @@ header:
     - 'compiler/**'
     - '.gitmodules'
     - '**/.gitrepo'
-    - '.mvn'
-    - 'mvnw'
-    - 'mvnw.cmd'
     - 'LICENSE'
     - 'NOTICE'
     - 'CNAME'
-    - 'Jenkinsfile'
-    - '**/vendor/**'
     - '**/go.sum'
-    - '**/addons/manifests/**'
-    - '**/dubbo-ui/dist/**'
-    - '**/ui/**'
-    - '**/governance_config_mock.go'
     - '**/templates/**'
     - '**/testdata/**'
     - '**/deploy.tpl'
     - '**/docs/**'
     - '**/.husky/pre-commit'
-    - '**/.nvmrc'
-    - '**/**.txtar'
-    - '**/**gen.go'
-    - '**/api/**'
-    - '**/deploy/**'
-    - '**/mk/**'
-    - '**/test/**'
-    - '**/tools/**'
-    - '**/apis/**'
   comment: on-failure
 
   license-location-threshold: 130
diff --git a/go.mod b/go.mod
index 2a974281..c9d60e36 100644
--- a/go.mod
+++ b/go.mod
@@ -26,6 +26,7 @@ require (
        github.com/chzyer/readline v1.5.1
        github.com/containers/image/v5 v5.34.0
        github.com/containers/storage v1.57.1
+       github.com/coreos/go-oidc/v3 v3.12.0
        github.com/docker/cli v28.3.3+incompatible
        github.com/docker/docker v27.5.1+incompatible
        github.com/docker/docker-credential-helpers v0.9.3
@@ -58,12 +59,12 @@ require (
        golang.org/x/net v0.43.0
        golang.org/x/sys v0.35.0
        golang.org/x/term v0.34.0
+       golang.org/x/time v0.12.0
        google.golang.org/grpc v1.74.2
        google.golang.org/protobuf v1.36.7
        gopkg.in/yaml.v3 v3.0.1
        helm.sh/helm/v3 v3.18.6
        istio.io/api v1.26.0-alpha.0.0.20250908200844-f7a34ed800ee
-       istio.io/client-go v1.26.0-alpha.0.0.20250908201345-99e026bfe54f
        k8s.io/api v0.33.4
        k8s.io/apiextensions-apiserver v0.33.4
        k8s.io/apimachinery v0.33.4
@@ -239,7 +240,6 @@ require (
        golang.org/x/oauth2 v0.30.0 // indirect
        golang.org/x/sync v0.16.0 // indirect
        golang.org/x/text v0.28.0 // indirect
-       golang.org/x/time v0.12.0 // indirect
        google.golang.org/genproto/googleapis/api 
v0.0.0-20250811230008-5f3141c8851a // indirect
        google.golang.org/genproto/googleapis/rpc 
v0.0.0-20250811230008-5f3141c8851a // indirect
        gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
diff --git a/go.sum b/go.sum
index bfe17708..310ea56b 100644
--- a/go.sum
+++ b/go.sum
@@ -201,6 +201,8 @@ github.com/containers/image/v5 v5.34.0 
h1:HPqQaDUsox/3mC1pbOyLAIQEp0JhQqiUZ+6JiF
 github.com/containers/image/v5 v5.34.0/go.mod 
h1:/WnvUSEfdqC/ahMRd4YJDBLrpYWkGl018rB77iB3FDo=
 github.com/containers/storage v1.57.1 
h1:hKPoFsuBcB3qTzBxa4IFpZMRzUuL5Xhv/BE44W0XHx8=
 github.com/containers/storage v1.57.1/go.mod 
h1:i/Hb4lu7YgFr9G0K6BMjqW0BLJO1sFsnWQwj2UoWCUM=
+github.com/coreos/go-oidc/v3 v3.12.0 
h1:sJk+8G2qq94rDI6ehZ71Bol3oUHy63qNYmkiSjrc/Jo=
+github.com/coreos/go-oidc/v3 v3.12.0/go.mod 
h1:gE3LgjOgFoHi9a4ce4/tJczr0Ai2/BoDhf0r5lltWI0=
 github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod 
h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
 github.com/creack/pty v1.1.17/go.mod 
h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
 github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
@@ -868,8 +870,6 @@ honnef.co/go/tools 
v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
 honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod 
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
 istio.io/api v1.26.0-alpha.0.0.20250908200844-f7a34ed800ee 
h1:pDLJDbgzc69hw3EQHIhO+j/efHketfwARg3eL+QIqss=
 istio.io/api v1.26.0-alpha.0.0.20250908200844-f7a34ed800ee/go.mod 
h1:BD3qv/ekm16kvSgvSpuiDawgKhEwG97wx849CednJSg=
-istio.io/client-go v1.26.0-alpha.0.0.20250908201345-99e026bfe54f 
h1:gTRu3JTMDWZ2Qi7RcYIHXK2PPmiCtRAZ47QOiSooCxM=
-istio.io/client-go v1.26.0-alpha.0.0.20250908201345-99e026bfe54f/go.mod 
h1:k9fittgsfNR2AkUGLYRkEyTbSa1marHI4oqsGkE4AEI=
 k8s.io/api v0.33.4 h1:oTzrFVNPXBjMu0IlpA2eDDIU49jsuEorGHB4cvKupkk=
 k8s.io/api v0.33.4/go.mod h1:VHQZ4cuxQ9sCUMESJV5+Fe8bGnqAARZ08tSTdHWfeAc=
 k8s.io/apiextensions-apiserver v0.33.4 
h1:rtq5SeXiDbXmSwxsF0MLe2Mtv3SwprA6wp+5qh/CrOU=
diff --git a/pkg/cmd/cmd.go b/pkg/cmd/cmd.go
index 0d9d4719..5d216b66 100644
--- a/pkg/cmd/cmd.go
+++ b/pkg/cmd/cmd.go
@@ -38,6 +38,13 @@ func PrintFlags(flags *pflag.FlagSet) {
        })
 }
 
+func WaitSignal(stop chan struct{}) {
+       sigs := make(chan os.Signal, 1)
+       signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
+       <-sigs
+       close(stop)
+}
+
 func WaitSignalFunc(cancel context.CancelCauseFunc) {
        sigs := make(chan os.Signal, 1)
        signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
diff --git a/pkg/kube/client.go b/pkg/kube/client.go
index 638db0f4..56f77b3c 100644
--- a/pkg/kube/client.go
+++ b/pkg/kube/client.go
@@ -18,6 +18,7 @@
 package kube
 
 import (
+       "context"
        "fmt"
        "github.com/apache/dubbo-kubernetes/operator/pkg/config"
        "github.com/apache/dubbo-kubernetes/pkg/cluster"
@@ -27,11 +28,11 @@ import (
        "github.com/apache/dubbo-kubernetes/pkg/lazy"
        "github.com/apache/dubbo-kubernetes/pkg/sleep"
        "go.uber.org/atomic"
-       istioclient "istio.io/client-go/pkg/clientset/versioned"
        kubeExtClient 
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
        "k8s.io/apimachinery/pkg/api/meta"
        "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
        "k8s.io/apimachinery/pkg/runtime/schema"
+       "k8s.io/apimachinery/pkg/util/wait"
        kubeVersion "k8s.io/apimachinery/pkg/version"
        "k8s.io/client-go/dynamic"
        "k8s.io/client-go/kubernetes"
@@ -59,8 +60,9 @@ type client struct {
        objectFilter           kubetypes.DynamicObjectFilter
        clusterID              cluster.ID
        informerWatchesPending *atomic.Int32
+       started                atomic.Bool
        crdWatcher             kubetypes.CrdWatcher
-       istio                  istioclient.Interface
+       fastSync               bool
 }
 
 type Client interface {
@@ -82,6 +84,10 @@ type Client interface {
 
        CrdWatcher() kubetypes.CrdWatcher
 
+       RunAndWait(stop <-chan struct{}) bool
+
+       WaitForCacheSync(name string, stop <-chan struct{}, cacheSyncs 
...cache.InformerSynced) bool
+
        Shutdown()
 }
 
@@ -231,6 +237,64 @@ func (c *client) DynamicClientFor(gvk 
schema.GroupVersionKind, obj *unstructured
        return dr, nil
 }
 
+func (c *client) WaitForCacheSync(name string, stop <-chan struct{}, 
cacheSyncs ...cache.InformerSynced) bool {
+       if c.informerWatchesPending == nil {
+               return WaitForCacheSync(name, stop, cacheSyncs...)
+       }
+       syncFns := append(cacheSyncs, func() bool {
+               return c.informerWatchesPending.Load() == 0
+       })
+       return WaitForCacheSync(name, stop, syncFns...)
+}
+
+func (c *client) Run(stop <-chan struct{}) {
+       c.informerFactory.Start(stop)
+       if c.crdWatcher != nil {
+               go c.crdWatcher.Run(stop)
+       }
+       alreadyStarted := c.started.Swap(true)
+       if alreadyStarted {
+               klog.V(2).Infof("cluster %q kube client started again", 
c.clusterID)
+       } else {
+               klog.Infof("cluster %q kube client started", c.clusterID)
+       }
+}
+
+func (c *client) RunAndWait(stop <-chan struct{}) bool {
+       c.Run(stop)
+       if c.fastSync {
+               if c.crdWatcher != nil {
+                       if !c.WaitForCacheSync("crd watcher", stop, 
c.crdWatcher.HasSynced) {
+                               return false
+                       }
+               }
+               // WaitForCacheSync will virtually never be synced on the first 
call, as its called immediately after Start()
+               // This triggers a 100ms delay per call, which is often called 
2-3 times in a test, delaying tests.
+               // Instead, we add an aggressive sync polling
+               if !fastWaitForCacheSync(stop, c.informerFactory) {
+                       return false
+               }
+               err := wait.PollUntilContextTimeout(context.Background(), 
time.Microsecond*100, wait.ForeverTestTimeout, true, func(ctx context.Context) 
(bool, error) {
+                       select {
+                       case <-stop:
+                               return false, fmt.Errorf("channel closed")
+                       default:
+                       }
+                       if c.informerWatchesPending.Load() == 0 {
+                               return true, nil
+                       }
+                       return false, nil
+               })
+               return err == nil
+       }
+       if c.crdWatcher != nil {
+               if !c.WaitForCacheSync("crd watcher", stop, 
c.crdWatcher.HasSynced) {
+                       return false
+               }
+       }
+       return c.informerFactory.WaitForCacheSync(stop)
+}
+
 func (c *client) bestEffortToGVR(gvk schema.GroupVersionKind, obj 
*unstructured.Unstructured, namespace string) (schema.GroupVersionResource, 
bool) {
        if s, f := 
collections.All.FindByGroupVersionAliasesKind(config.FromKubernetesGVK(gvk)); f 
{
                gvr := s.GroupVersionResource()
@@ -294,6 +358,20 @@ func WaitForCacheSync(name string, stop <-chan struct{}, 
cacheSyncs ...cache.Inf
        }
 }
 
+func fastWaitForCacheSync(stop <-chan struct{}, informerFactory 
informerfactory.InformerFactory) bool {
+       returnImmediately := make(chan struct{})
+       close(returnImmediately)
+       err := wait.PollUntilContextTimeout(context.Background(), 
time.Microsecond*100, wait.ForeverTestTimeout, true, func(context.Context) 
(bool, error) {
+               select {
+               case <-stop:
+                       return false, fmt.Errorf("channel closed")
+               default:
+               }
+               return informerFactory.WaitForCacheSync(returnImmediately), nil
+       })
+       return err == nil
+}
+
 func (c *client) Shutdown() {
        c.informerFactory.Shutdown()
 }
diff --git a/pkg/security/security.go b/pkg/security/security.go
index bcf5ac08..4476e0be 100644
--- a/pkg/security/security.go
+++ b/pkg/security/security.go
@@ -19,9 +19,12 @@ package security
 
 import (
        "context"
+       "fmt"
+       "google.golang.org/grpc/metadata"
        "net/http"
        "os"
        "path/filepath"
+       "strings"
        "time"
 )
 
@@ -42,14 +45,14 @@ const (
 )
 
 const (
-       CertSigner = "CertSigner"
+       BearerTokenPrefix = "Bearer "
+       K8sTokenPrefix    = "Dubbo "
+       CertSigner        = "CertSigner"
 )
 
 type AuthContext struct {
-       // grpc context
        GrpcContext context.Context
-       // http request
-       Request *http.Request
+       Request     *http.Request
 }
 
 type Authenticator interface {
@@ -57,7 +60,6 @@ type Authenticator interface {
        AuthenticatorType() string
 }
 
-// SecretItem is the cached item in in-memory secret store.
 type SecretItem struct {
        CertificateChain []byte
        PrivateKey       []byte
@@ -67,7 +69,6 @@ type SecretItem struct {
        ExpireTime       time.Time
 }
 
-// SecretManager defines secrets management interface which is used by SDS.
 type SecretManager interface {
        GenerateSecret(resourceName string) (*SecretItem, error)
 }
@@ -80,6 +81,15 @@ type SdsCertificateConfig struct {
 
 type AuthSource int
 
+const (
+       AuthSourceClientCertificate AuthSource = iota
+       AuthSourceIDToken
+)
+
+const (
+       authorizationMeta = "authorization"
+)
+
 type KubernetesInfo struct {
        PodName           string
        PodNamespace      string
@@ -147,3 +157,39 @@ func CheckWorkloadCertificate(certChainFilePath, 
keyFilePath, rootCertFilePath s
        }
        return true
 }
+
+func ExtractBearerToken(ctx context.Context) (string, error) {
+       md, ok := metadata.FromIncomingContext(ctx)
+       if !ok {
+               return "", fmt.Errorf("no metadata is attached")
+       }
+
+       authHeader, exists := md[authorizationMeta]
+       if !exists {
+               return "", fmt.Errorf("no HTTP authorization header exists")
+       }
+
+       for _, value := range authHeader {
+               if strings.HasPrefix(value, BearerTokenPrefix) {
+                       return strings.TrimPrefix(value, BearerTokenPrefix), nil
+               }
+       }
+
+       return "", fmt.Errorf("no bearer token exists in HTTP authorization 
header")
+}
+
+func ExtractRequestToken(req *http.Request) (string, error) {
+       value := req.Header.Get(authorizationMeta)
+       if value == "" {
+               return "", fmt.Errorf("no HTTP authorization header exists")
+       }
+
+       if strings.HasPrefix(value, BearerTokenPrefix) {
+               return strings.TrimPrefix(value, BearerTokenPrefix), nil
+       }
+       if strings.HasPrefix(value, K8sTokenPrefix) {
+               return strings.TrimPrefix(value, K8sTokenPrefix), nil
+       }
+
+       return "", fmt.Errorf("no bearer token exists in HTTP authorization 
header")
+}
diff --git a/sail/cmd/sail-discovery/app/cmd.go 
b/sail/cmd/sail-discovery/app/cmd.go
index 09e29571..12eb57bb 100644
--- a/sail/cmd/sail-discovery/app/cmd.go
+++ b/sail/cmd/sail-discovery/app/cmd.go
@@ -78,6 +78,9 @@ func newDiscoveryCommand() *cobra.Command {
                        if err := discoveryServer.Start(stop); err != nil {
                                return fmt.Errorf("failed to start discovery 
service: %v", err)
                        }
+
+                       cmd.WaitSignal(stop)
+
                        return nil
                },
        }
diff --git a/sail/pkg/bootstrap/ca.go b/sail/pkg/bootstrap/ca.go
index 0ad04100..6b88a03e 100644
--- a/sail/pkg/bootstrap/ca.go
+++ b/sail/pkg/bootstrap/ca.go
@@ -20,21 +20,28 @@ package bootstrap
 import (
        "bytes"
        "context"
+       "encoding/json"
        "fmt"
        "github.com/apache/dubbo-kubernetes/pkg/config/constants"
        "github.com/apache/dubbo-kubernetes/pkg/env"
        "github.com/apache/dubbo-kubernetes/pkg/security"
        "github.com/apache/dubbo-kubernetes/sail/pkg/features"
+       securityModel 
"github.com/apache/dubbo-kubernetes/sail/pkg/security/model"
        "github.com/apache/dubbo-kubernetes/security/pkg/cmd"
        "github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
        "github.com/apache/dubbo-kubernetes/security/pkg/pki/ra"
        caserver "github.com/apache/dubbo-kubernetes/security/pkg/server/ca"
+       "github.com/apache/dubbo-kubernetes/security/pkg/server/ca/authenticate"
+       "github.com/apache/dubbo-kubernetes/security/pkg/util"
        "github.com/fsnotify/fsnotify"
+       "google.golang.org/grpc"
+       "istio.io/api/security/v1beta1"
        "k8s.io/apimachinery/pkg/api/errors"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/klog/v2"
        "os"
        "path"
+       "strings"
        "time"
 )
 
@@ -49,6 +56,10 @@ type caOptions struct {
 }
 
 var (
+       trustedIssuer = env.Register("TOKEN_ISSUER", "",
+               "OIDC token issuer. If set, will be used to check the tokens.")
+       audience = env.Register("AUDIENCE", "",
+               "Expected audience in the tokens. ")
        LocalCertDir = env.Register("ROOT_CA_DIR", "./etc/cacerts",
                "Location of a local or mounted CA root")
        useRemoteCerts = env.Register("USE_REMOTE_CERTS", false,
@@ -59,6 +70,8 @@ var (
        // TODO: Likely to be removed and added to mesh config
        k8sSigner = env.Register("K8S_SIGNER", "",
                "Kubernetes CA Signer type. Valid from Kubernetes 1.18").Get()
+       k8sInCluster = env.Register("KUBERNETES_SERVICE_HOST", "",
+               "Kubernetes service host, set automatically when running 
in-cluster")
        workloadCertTTL = env.Register("DEFAULT_WORKLOAD_CERT_TTL",
                cmd.DefaultWorkloadCertTTL,
                "The default TTL of issued workload certificates. Applied when 
the client sets a "+
@@ -88,6 +101,55 @@ var (
                        "rotation. This interval is suggested to be larger than 
10 minutes.")
 )
 
+func (s *Server) initCAServer(ca caserver.CertificateAuthority, opts 
*caOptions) {
+       caServer, startErr := caserver.New(ca, maxWorkloadCertTTL.Get(), 
opts.Authenticators)
+       if startErr != nil {
+               klog.Errorf("failed to create dubbo ca server: %v", startErr)
+       }
+       s.caServer = caServer
+}
+
+func (s *Server) RunCA(grpc *grpc.Server) {
+       iss := trustedIssuer.Get()
+       aud := audience.Get()
+
+       token, err := os.ReadFile(securityModel.ThirdPartyJwtPath)
+       if err == nil {
+               tok, err := detectAuthEnv(string(token))
+               if err != nil {
+                       klog.Warningf("Starting with invalid K8S JWT token: 
%v", err)
+               } else {
+                       if iss == "" {
+                               iss = tok.Iss
+                       }
+                       if len(tok.Aud) > 0 && len(aud) == 0 {
+                               aud = tok.Aud[0]
+                       }
+               }
+       }
+
+       // TODO: if not set, parse Istiod's own token (if present) and get the 
issuer. The same issuer is used
+       // for all tokens - no need to configure twice. The token may also 
include cluster info to auto-configure
+       // networking properties.
+       if iss != "" && // issuer set explicitly or extracted from our own JWT
+               k8sInCluster.Get() == "" { // not running in cluster - in 
cluster use direct call to apiserver
+               // Add a custom authenticator using standard JWT validation, if 
not running in K8S
+               // When running inside K8S - we can use the built-in validator, 
which also check pod removal (invalidation).
+               jwtRule := v1beta1.JWTRule{Issuer: iss, Audiences: 
[]string{aud}}
+               oidcAuth, err := authenticate.NewJwtAuthenticator(&jwtRule, nil)
+               if err == nil {
+                       s.caServer.Authenticators = 
append(s.caServer.Authenticators, oidcAuth)
+                       klog.Info("Using out-of-cluster JWT authentication")
+               } else {
+                       klog.Info("K8S token doesn't support OIDC, using only 
in-cluster auth")
+               }
+       }
+
+       s.caServer.Register(grpc)
+
+       klog.Info("Dubbod CA has started")
+}
+
 func (s *Server) loadCACerts(caOpts *caOptions, dir string) error {
        if s.kubeClient == nil {
                return nil
@@ -313,6 +375,27 @@ func (s *Server) handleCACertsFileWatch() {
        }
 }
 
+func detectAuthEnv(jwt string) (*authenticate.JwtPayload, error) {
+       jwtSplit := strings.Split(jwt, ".")
+       if len(jwtSplit) != 3 {
+               return nil, fmt.Errorf("invalid JWT parts: %s", jwt)
+       }
+       payload := jwtSplit[1]
+
+       payloadBytes, err := util.DecodeJwtPart(payload)
+       if err != nil {
+               return nil, fmt.Errorf("failed to decode jwt: %v", err.Error())
+       }
+
+       structuredPayload := &authenticate.JwtPayload{}
+       err = json.Unmarshal(payloadBytes, &structuredPayload)
+       if err != nil {
+               return nil, fmt.Errorf("failed to unmarshal jwt: %v", 
err.Error())
+       }
+
+       return structuredPayload, nil
+}
+
 func handleEvent(s *Server) {
        klog.Info("Update Dubbod cacerts")
 
diff --git a/sail/pkg/bootstrap/options.go b/sail/pkg/bootstrap/options.go
index a45e9b11..e4facdc9 100644
--- a/sail/pkg/bootstrap/options.go
+++ b/sail/pkg/bootstrap/options.go
@@ -49,6 +49,7 @@ type SailArgs struct {
        CtrlZOptions       *ctrlz.Options
        KeepaliveOptions   *keepalive.Options
        KrtDebugger        *krt.DebugHandler `json:"-"`
+       JwtRule            string
 }
 
 type DiscoveryServerOptions struct {
diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go
index 89e49947..fed67bdb 100644
--- a/sail/pkg/bootstrap/server.go
+++ b/sail/pkg/bootstrap/server.go
@@ -46,6 +46,7 @@ import (
        "github.com/apache/dubbo-kubernetes/sail/pkg/xds"
        "github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
        "github.com/apache/dubbo-kubernetes/security/pkg/pki/ra"
+       caserver "github.com/apache/dubbo-kubernetes/security/pkg/server/ca"
        "github.com/fsnotify/fsnotify"
        grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
        "golang.org/x/net/http2"
@@ -100,12 +101,18 @@ type Server struct {
        CA                  *ca.DubboCA
        dnsNames            []string
 
+       caServer *caserver.Server
+
        certMu     sync.RWMutex
        dubbodCert *tls.Certificate
 
        dubbodCertBundleWatcher *keycertbundle.Watcher
+
+       readinessProbes map[string]readinessProbe
 }
 
+type readinessProbe func() bool
+
 func NewServer(args *SailArgs, initFuncs ...func(*Server)) (*Server, error) {
        e := model.NewEnvironment()
        e.DomainSuffix = args.RegistryOptions.KubeOptions.DomainSuffix
@@ -199,6 +206,26 @@ func NewServer(args *SailArgs, initFuncs ...func(*Server)) 
(*Server, error) {
                return nil, fmt.Errorf("error initializing secure gRPC 
Listener: %v", err)
        }
 
+       if s.kubeClient != nil {
+               s.initSecureWebhookServer(args)
+               if err := s.initConfigValidation(args); err != nil {
+                       return nil, fmt.Errorf("error initializing config 
validator: %v", err)
+               }
+       }
+
+       // TODO initRegistryEventHandlers?
+
+       // TODO initDiscoveryService?
+
+       s.startCA(caOpts)
+
+       if s.kubeClient != nil {
+               s.addStartFunc("kube client", func(stop <-chan struct{}) error {
+                       s.kubeClient.RunAndWait(stop)
+                       return nil
+               })
+       }
+
        return s, nil
 }
 
@@ -259,6 +286,29 @@ func (s *Server) Start(stop <-chan struct{}) error {
        return nil
 }
 
+func (s *Server) startCA(caOpts *caOptions) {
+       if s.CA == nil && s.RA == nil {
+               return
+       }
+       // init the RA server if configured, else start init CA server
+       if s.RA != nil {
+               klog.Infof("initializing CA server with RA")
+               s.initCAServer(s.RA, caOpts)
+       } else if s.CA != nil {
+               klog.Infof("initializing CA server with Dubbod CA")
+               s.initCAServer(s.CA, caOpts)
+       }
+       s.addStartFunc("ca", func(stop <-chan struct{}) error {
+               grpcServer := s.secureGrpcServer
+               if s.secureGrpcServer == nil {
+                       grpcServer = s.grpcServer
+               }
+               klog.Infof("starting CA server")
+               s.RunCA(grpcServer)
+               return nil
+       })
+}
+
 func (s *Server) initKubeClient(args *SailArgs) error {
        if s.kubeClient != nil {
                // Already initialized by startup arguments
@@ -677,6 +727,17 @@ func (s *Server) initDubbodCerts(args *SailArgs, host 
string) error {
        return err
 }
 
+func (s *Server) dubbodReadyHandler(w http.ResponseWriter, _ *http.Request) {
+       for name, fn := range s.readinessProbes {
+               if ready := fn(); !ready {
+                       klog.Warningf("%s is not ready", name)
+                       w.WriteHeader(http.StatusServiceUnavailable)
+                       return
+               }
+       }
+       w.WriteHeader(http.StatusOK)
+}
+
 func getDNSNames(args *SailArgs, host string) []string {
        // Append custom hostname if there is any
        customHost := features.DubbodServiceCustomHost
diff --git a/sail/pkg/bootstrap/validation.go b/sail/pkg/bootstrap/validation.go
new file mode 100644
index 00000000..7f7fd73a
--- /dev/null
+++ b/sail/pkg/bootstrap/validation.go
@@ -0,0 +1,13 @@
+package bootstrap
+
+import (
+       "k8s.io/klog/v2"
+)
+
+func (s *Server) initConfigValidation(args *SailArgs) error {
+       if s.kubeClient == nil {
+               return nil
+       }
+       klog.Info("initializing config validator")
+       return nil
+}
diff --git a/sail/pkg/bootstrap/webhook.go b/sail/pkg/bootstrap/webhook.go
new file mode 100644
index 00000000..6fd29e77
--- /dev/null
+++ b/sail/pkg/bootstrap/webhook.go
@@ -0,0 +1,52 @@
+package bootstrap
+
+import (
+       "crypto/tls"
+       sec_model "github.com/apache/dubbo-kubernetes/pkg/model"
+       "k8s.io/klog/v2"
+       "log"
+       "net/http"
+       "strings"
+)
+
+type httpServerErrorLogWriter struct{}
+
+func (*httpServerErrorLogWriter) Write(p []byte) (int, error) {
+       m := strings.TrimSuffix(string(p), "\n")
+       if strings.HasPrefix(m, "http: TLS handshake error") && 
strings.HasSuffix(m, ": EOF") {
+               klog.V(2).Info(m)
+       } else {
+               klog.Info(m)
+       }
+       return len(p), nil
+}
+
+func (s *Server) initSecureWebhookServer(args *SailArgs) {
+       // create the https server for hosting the k8s injectionWebhook 
handlers.
+       if args.ServerOptions.HTTPSAddr == "" {
+               s.httpsMux = s.httpMux
+               klog.Infof("HTTPS port is disabled, multiplexing webhooks on 
the httpAddr %v", args.ServerOptions.HTTPAddr)
+               return
+       }
+
+       tlsConfig := &tls.Config{
+               GetCertificate: s.getDubbodCertificate,
+               MinVersion:     tls.VersionTLS12,
+               CipherSuites:   args.ServerOptions.TLSOptions.CipherSuits,
+       }
+       // Compliance for control plane validation and injection webhook server.
+       sec_model.EnforceGoCompliance(tlsConfig)
+
+       klog.Info("initializing secure webhook server for dubbod webhooks")
+       // create the https server for hosting the k8s injectionWebhook 
handlers.
+       s.httpsMux = http.NewServeMux()
+       s.httpsServer = &http.Server{
+               Addr:      args.ServerOptions.HTTPSAddr,
+               ErrorLog:  log.New(&httpServerErrorLogWriter{}, "", 0),
+               Handler:   s.httpsMux,
+               TLSConfig: tlsConfig,
+       }
+
+       // register istiodReadyHandler on the httpsMux so that readiness can 
also be checked remotely
+       s.httpsMux.HandleFunc("/ready", s.dubbodReadyHandler)
+}
diff --git a/sail/pkg/model/xds_cache.go b/sail/pkg/model/xds_cache.go
index 7ebeee13..10e53ef6 100644
--- a/sail/pkg/model/xds_cache.go
+++ b/sail/pkg/model/xds_cache.go
@@ -21,7 +21,9 @@ import (
        "github.com/apache/dubbo-kubernetes/sail/pkg/features"
 )
 
-type XdsCache interface{}
+type XdsCache interface {
+       Run(stop <-chan struct{})
+}
 
 type DisabledCache struct{}
 
@@ -44,3 +46,8 @@ func NewXdsCache() XdsCache {
 
        return cache
 }
+
+func (x XdsCacheImpl) Run(stop <-chan struct{}) {}
+
+func (d DisabledCache) Run(stop <-chan struct{}) {
+}
diff --git a/sail/pkg/security/model/authentication.go 
b/sail/pkg/security/model/authentication.go
new file mode 100644
index 00000000..a4c135e5
--- /dev/null
+++ b/sail/pkg/security/model/authentication.go
@@ -0,0 +1,5 @@
+package model
+
+const (
+       ThirdPartyJwtPath = "/var/run/secrets/tokens/dubbo-token"
+)
diff --git a/sail/pkg/xds/discovery.go b/sail/pkg/xds/discovery.go
index a4c58ed8..57d9d4ba 100644
--- a/sail/pkg/xds/discovery.go
+++ b/sail/pkg/xds/discovery.go
@@ -29,25 +29,36 @@ import (
        "time"
 )
 
+type DebounceOptions struct {
+       DebounceAfter     time.Duration
+       debounceMax       time.Duration
+       enableEDSDebounce bool
+}
+
 type DiscoveryServer struct {
-       Env                *model.Environment
-       serverReady        atomic.Bool
-       DiscoveryStartTime time.Time
-       ClusterAliases     map[cluster.ID]cluster.ID
-       Cache              model.XdsCache
-       pushQueue          *PushQueue
-       krtDebugger        *krt.DebugHandler
-       InboundUpdates     *atomic.Int64
-       CommittedUpdates   *atomic.Int64
-       RequestRateLimit   *rate.Limiter
-       ProxyNeedsPush     func(proxy *model.Proxy, req *model.PushRequest) 
(*model.PushRequest, bool)
+       Env                 *model.Environment
+       serverReady         atomic.Bool
+       DiscoveryStartTime  time.Time
+       ClusterAliases      map[cluster.ID]cluster.ID
+       Cache               model.XdsCache
+       pushQueue           *PushQueue
+       krtDebugger         *krt.DebugHandler
+       InboundUpdates      *atomic.Int64
+       CommittedUpdates    *atomic.Int64
+       RequestRateLimit    *rate.Limiter
+       ProxyNeedsPush      func(proxy *model.Proxy, req *model.PushRequest) 
(*model.PushRequest, bool)
+       pushChannel         chan *model.PushRequest
+       DebounceOptions     DebounceOptions
+       concurrentPushLimit chan struct{}
 }
 
 func NewDiscoveryServer(env *model.Environment, clusterAliases 
map[string]string, debugger *krt.DebugHandler) *DiscoveryServer {
        out := &DiscoveryServer{
-               Env:         env,
-               Cache:       env.Cache,
-               krtDebugger: debugger,
+               Env:              env,
+               Cache:            env.Cache,
+               krtDebugger:      debugger,
+               InboundUpdates:   atomic.NewInt64(0),
+               CommittedUpdates: atomic.NewInt64(0),
        }
        out.ClusterAliases = make(map[cluster.ID]cluster.ID)
        for alias := range clusterAliases {
@@ -61,6 +72,12 @@ func (s *DiscoveryServer) Register(rpcs *grpc.Server) {
        discovery.RegisterAggregatedDiscoveryServiceServer(rpcs, s)
 }
 
+func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
+       go s.handleUpdates(stopCh)
+       go s.sendPushes(stopCh)
+       go s.Cache.Run(stopCh)
+}
+
 func (s *DiscoveryServer) CachesSynced() {
        klog.Infof("All caches have been synced up in %v, marking server 
ready", time.Since(s.DiscoveryStartTime))
        s.serverReady.Store(true)
@@ -70,6 +87,21 @@ func (s *DiscoveryServer) Shutdown() {
        s.pushQueue.ShutDown()
 }
 
+func (s *DiscoveryServer) Push(req *model.PushRequest) {}
+
 func (s *DiscoveryServer) globalPushContext() *model.PushContext {
        return s.Env.PushContext()
 }
+
+func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) {
+       debounce(s.pushChannel, stopCh, s.DebounceOptions, s.Push, 
s.CommittedUpdates)
+}
+
+func (s *DiscoveryServer) sendPushes(stopCh <-chan struct{}) {
+       doSendPushes(stopCh, s.concurrentPushLimit, s.pushQueue)
+}
+
+func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, opts 
DebounceOptions, pushFn func(req *model.PushRequest), updateSent *atomic.Int64) 
{
+}
+
+func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue 
*PushQueue) {}
diff --git a/security/pkg/server/ca/authenticate/cert_authenticator.go 
b/security/pkg/server/ca/authenticate/cert_authenticator.go
new file mode 100644
index 00000000..e40b2507
--- /dev/null
+++ b/security/pkg/server/ca/authenticate/cert_authenticator.go
@@ -0,0 +1,85 @@
+package authenticate
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/security"
+       "github.com/apache/dubbo-kubernetes/security/pkg/pki/util"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/peer"
+       "net/http"
+)
+
+const (
+       ClientCertAuthenticatorType = "ClientCertAuthenticator"
+)
+
+type ClientCertAuthenticator struct{}
+
+var _ security.Authenticator = &ClientCertAuthenticator{}
+
+func (cca *ClientCertAuthenticator) AuthenticatorType() string {
+       return ClientCertAuthenticatorType
+}
+
+// Authenticate extracts identities from presented client certificates. This
+// method assumes that certificate chain has been properly validated before
+// this method is called. In other words, this method does not do certificate
+// chain validation itself.
+func (cca *ClientCertAuthenticator) Authenticate(authCtx security.AuthContext) 
(*security.Caller, error) {
+       if authCtx.GrpcContext != nil {
+               return cca.authenticateGrpc(authCtx.GrpcContext)
+       }
+       if authCtx.Request != nil {
+               return cca.authenticateHTTP(authCtx.Request)
+       }
+       return nil, nil
+}
+
+func (cca *ClientCertAuthenticator) authenticateGrpc(ctx context.Context) 
(*security.Caller, error) {
+       peer, ok := peer.FromContext(ctx)
+       if !ok || peer.AuthInfo == nil {
+               return nil, fmt.Errorf("no client certificate is presented")
+       }
+
+       if authType := peer.AuthInfo.AuthType(); authType != "tls" {
+               return nil, fmt.Errorf("unsupported auth type: %q", authType)
+       }
+
+       tlsInfo := peer.AuthInfo.(credentials.TLSInfo)
+       chains := tlsInfo.State.VerifiedChains
+       if len(chains) == 0 || len(chains[0]) == 0 {
+               return nil, fmt.Errorf("no verified chain is found")
+       }
+
+       ids, err := util.ExtractIDs(chains[0][0].Extensions)
+       if err != nil {
+               return nil, err
+       }
+
+       return &security.Caller{
+               AuthSource: security.AuthSourceClientCertificate,
+               Identities: ids,
+       }, nil
+}
+
+func (cca *ClientCertAuthenticator) authenticateHTTP(req *http.Request) 
(*security.Caller, error) {
+       if req.TLS == nil || req.TLS.VerifiedChains == nil {
+               return nil, fmt.Errorf("no client certificate is presented")
+       }
+
+       chains := req.TLS.VerifiedChains
+       if len(chains) == 0 || len(chains[0]) == 0 {
+               return nil, fmt.Errorf("no verified chain is found")
+       }
+
+       ids, err := util.ExtractIDs(chains[0][0].Extensions)
+       if err != nil {
+               return nil, err
+       }
+
+       return &security.Caller{
+               AuthSource: security.AuthSourceClientCertificate,
+               Identities: ids,
+       }, nil
+}
diff --git a/security/pkg/server/ca/authenticate/oidc.go 
b/security/pkg/server/ca/authenticate/oidc.go
new file mode 100644
index 00000000..9cbb1d8d
--- /dev/null
+++ b/security/pkg/server/ca/authenticate/oidc.go
@@ -0,0 +1,117 @@
+package authenticate
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/dubbo-kubernetes/pkg/config/mesh"
+       "github.com/apache/dubbo-kubernetes/pkg/security"
+       "github.com/apache/dubbo-kubernetes/pkg/spiffe"
+       oidc "github.com/coreos/go-oidc/v3/oidc"
+       "istio.io/api/security/v1beta1"
+       "strings"
+)
+
+const (
+       IDTokenAuthenticatorType = "IDTokenAuthenticator"
+)
+
+type JwtPayload struct {
+       Aud []string `json:"aud"`
+       Exp int      `json:"exp"`
+       Iss string   `json:"iss"`
+       Sub string   `json:"sub"`
+}
+
+type JwtAuthenticator struct {
+       // holder of a mesh configuration for dynamically updating trust domain
+       meshHolder mesh.Holder
+       audiences  []string
+       verifier   *oidc.IDTokenVerifier
+}
+
+func NewJwtAuthenticator(jwtRule *v1beta1.JWTRule, meshWatcher mesh.Watcher) 
(*JwtAuthenticator, error) {
+       issuer := jwtRule.GetIssuer()
+       jwksURL := jwtRule.GetJwksUri()
+       // The key of a JWT issuer may change, so the key may need to be 
updated.
+       // Based on 
https://pkg.go.dev/github.com/coreos/go-oidc/v3/oidc#NewRemoteKeySet
+       // the oidc library handles caching and cache invalidation. Thus, the 
verifier
+       // is only created once in the constructor.
+       var verifier *oidc.IDTokenVerifier
+       if len(jwksURL) == 0 {
+               // OIDC discovery is used if jwksURL is not set.
+               provider, err := oidc.NewProvider(context.Background(), issuer)
+               // OIDC discovery may fail, e.g. http request for the OIDC 
server may fail.
+               if err != nil {
+                       return nil, fmt.Errorf("failed at creating an OIDC 
provider for %v: %v", issuer, err)
+               }
+               verifier = provider.Verifier(&oidc.Config{SkipClientIDCheck: 
true})
+       } else {
+               keySet := oidc.NewRemoteKeySet(context.Background(), jwksURL)
+               verifier = oidc.NewVerifier(issuer, keySet, 
&oidc.Config{SkipClientIDCheck: true})
+       }
+       return &JwtAuthenticator{
+               meshHolder: meshWatcher,
+               verifier:   verifier,
+               audiences:  jwtRule.Audiences,
+       }, nil
+}
+
+func (j *JwtAuthenticator) Authenticate(authRequest security.AuthContext) 
(*security.Caller, error) {
+       if authRequest.GrpcContext != nil {
+               bearerToken, err := 
security.ExtractBearerToken(authRequest.GrpcContext)
+               if err != nil {
+                       return nil, fmt.Errorf("ID token extraction error: %v", 
err)
+               }
+               return j.authenticate(authRequest.GrpcContext, bearerToken)
+       }
+       if authRequest.Request != nil {
+               bearerToken, err := 
security.ExtractRequestToken(authRequest.Request)
+               if err != nil {
+                       return nil, fmt.Errorf("target JWT extraction error: 
%v", err)
+               }
+               return j.authenticate(authRequest.Request.Context(), 
bearerToken)
+       }
+       return nil, nil
+}
+
+func (j *JwtAuthenticator) authenticate(ctx context.Context, bearerToken 
string) (*security.Caller, error) {
+       idToken, err := j.verifier.Verify(ctx, bearerToken)
+       if err != nil {
+               return nil, fmt.Errorf("failed to verify the JWT token (error 
%v)", err)
+       }
+
+       sa := JwtPayload{}
+       // "aud" for trust domain, "sub" has 
"system:serviceaccount:$namespace:$serviceaccount".
+       // in future trust domain may use another field as a standard is 
defined.
+       if err := idToken.Claims(&sa); err != nil {
+               return nil, fmt.Errorf("failed to extract claims from ID token: 
%v", err)
+       }
+       if !strings.HasPrefix(sa.Sub, "system:serviceaccount") {
+               return nil, fmt.Errorf("invalid sub %v", sa.Sub)
+       }
+       parts := strings.Split(sa.Sub, ":")
+       ns := parts[2]
+       ksa := parts[3]
+       if !checkAudience(sa.Aud, j.audiences) {
+               return nil, fmt.Errorf("invalid audiences %v", sa.Aud)
+       }
+       return &security.Caller{
+               AuthSource: security.AuthSourceIDToken,
+               Identities: 
[]string{spiffe.MustGenSpiffeURI(j.meshHolder.Mesh(), ns, ksa)},
+       }, nil
+}
+
+func (j JwtAuthenticator) AuthenticatorType() string {
+       return IDTokenAuthenticatorType
+}
+
+func checkAudience(audToCheck []string, audExpected []string) bool {
+       for _, a := range audToCheck {
+               for _, b := range audExpected {
+                       if a == b {
+                               return true
+                       }
+               }
+       }
+       return false
+}
diff --git a/security/pkg/server/ca/server.go b/security/pkg/server/ca/server.go
index a8479613..cb5af79a 100644
--- a/security/pkg/server/ca/server.go
+++ b/security/pkg/server/ca/server.go
@@ -18,10 +18,36 @@
 package ca
 
 import (
+       "github.com/apache/dubbo-kubernetes/pkg/security"
        "github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
        "github.com/apache/dubbo-kubernetes/security/pkg/pki/util"
+       "google.golang.org/grpc"
+       pb "istio.io/api/security/v1alpha1"
+       "time"
 )
 
+type Server struct {
+       pb.UnimplementedIstioCertificateServiceServer
+       Authenticators []security.Authenticator
+       serverCertTTL  time.Duration
+       ca             CertificateAuthority
+}
+
+func New(ca CertificateAuthority, ttl time.Duration, authenticators 
[]security.Authenticator) (*Server, error) {
+       certBundle := ca.GetCAKeyCertBundle()
+       if len(certBundle.GetRootCertPem()) != 0 {
+               RecordCertsExpiry(certBundle)
+       }
+
+       server := &Server{
+               Authenticators: authenticators,
+               serverCertTTL:  ttl,
+               ca:             ca,
+       }
+
+       return server, nil
+}
+
 type CertificateAuthority interface {
        Sign(csrPEM []byte, opts ca.CertOpts) ([]byte, error)
        SignWithCertChain(csrPEM []byte, opts ca.CertOpts) ([]string, error)
@@ -29,3 +55,7 @@ type CertificateAuthority interface {
 }
 
 func RecordCertsExpiry(keyCertBundle *util.KeyCertBundle) {}
+
+func (s *Server) Register(grpcServer *grpc.Server) {
+       pb.RegisterIstioCertificateServiceServer(grpcServer, s)
+}
diff --git a/security/pkg/util/jwtutil.go b/security/pkg/util/jwtutil.go
new file mode 100644
index 00000000..5d56a554
--- /dev/null
+++ b/security/pkg/util/jwtutil.go
@@ -0,0 +1,14 @@
+package util
+
+import (
+       "encoding/base64"
+       "strings"
+)
+
+func DecodeJwtPart(seg string) ([]byte, error) {
+       if l := len(seg) % 4; l > 0 {
+               seg += strings.Repeat("=", 4-l)
+       }
+
+       return base64.URLEncoding.DecodeString(seg)
+}


Reply via email to