This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new 5db06b24 [security] add aegis corecode (#796)
5db06b24 is described below
commit 5db06b247bb2dd54834b9061571c8d0bdab478d2
Author: Jian Zhong <[email protected]>
AuthorDate: Tue Sep 30 00:55:59 2025 +0800
[security] add aegis corecode (#796)
---
.licenserc.yaml | 23 ----
go.mod | 4 +-
go.sum | 4 +-
pkg/cmd/cmd.go | 7 ++
pkg/kube/client.go | 82 ++++++++++++++-
pkg/security/security.go | 58 ++++++++--
sail/cmd/sail-discovery/app/cmd.go | 3 +
sail/pkg/bootstrap/ca.go | 83 +++++++++++++++
sail/pkg/bootstrap/options.go | 1 +
sail/pkg/bootstrap/server.go | 61 +++++++++++
sail/pkg/bootstrap/validation.go | 13 +++
sail/pkg/bootstrap/webhook.go | 52 +++++++++
sail/pkg/model/xds_cache.go | 9 +-
sail/pkg/security/model/authentication.go | 5 +
sail/pkg/xds/discovery.go | 60 ++++++++---
.../server/ca/authenticate/cert_authenticator.go | 85 +++++++++++++++
security/pkg/server/ca/authenticate/oidc.go | 117 +++++++++++++++++++++
security/pkg/server/ca/server.go | 30 ++++++
security/pkg/util/jwtutil.go | 14 +++
19 files changed, 661 insertions(+), 50 deletions(-)
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 3de0b608..4ca86d88 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -24,8 +24,6 @@ header:
- '**/.settings/*'
- '**/.classpath'
- '**/.project'
- - '**/target/**'
- - '**/generated/**'
- '**/*.log'
- '**/codestyle/*'
- '**/resources/META-INF/**'
@@ -36,9 +34,7 @@ header:
- '**/*.load'
- '**/*.flex'
- '**/*.fc'
- - '**/*.javascript'
- '**/*.properties'
- - '**/*.thrift'
- '**/*.pb.go'
- '**/*.sh'
- '**/*.bat'
@@ -54,7 +50,6 @@ header:
- '**/*.schemas'
- '**/*.nojekyll'
- '**/.browserslistrc'
- - '**/yarn.lock'
- '.git/'
- '.github/**'
- '**/.gitignore'
@@ -64,33 +59,15 @@ header:
- 'compiler/**'
- '.gitmodules'
- '**/.gitrepo'
- - '.mvn'
- - 'mvnw'
- - 'mvnw.cmd'
- 'LICENSE'
- 'NOTICE'
- 'CNAME'
- - 'Jenkinsfile'
- - '**/vendor/**'
- '**/go.sum'
- - '**/addons/manifests/**'
- - '**/dubbo-ui/dist/**'
- - '**/ui/**'
- - '**/governance_config_mock.go'
- '**/templates/**'
- '**/testdata/**'
- '**/deploy.tpl'
- '**/docs/**'
- '**/.husky/pre-commit'
- - '**/.nvmrc'
- - '**/**.txtar'
- - '**/**gen.go'
- - '**/api/**'
- - '**/deploy/**'
- - '**/mk/**'
- - '**/test/**'
- - '**/tools/**'
- - '**/apis/**'
comment: on-failure
license-location-threshold: 130
diff --git a/go.mod b/go.mod
index 2a974281..c9d60e36 100644
--- a/go.mod
+++ b/go.mod
@@ -26,6 +26,7 @@ require (
github.com/chzyer/readline v1.5.1
github.com/containers/image/v5 v5.34.0
github.com/containers/storage v1.57.1
+ github.com/coreos/go-oidc/v3 v3.12.0
github.com/docker/cli v28.3.3+incompatible
github.com/docker/docker v27.5.1+incompatible
github.com/docker/docker-credential-helpers v0.9.3
@@ -58,12 +59,12 @@ require (
golang.org/x/net v0.43.0
golang.org/x/sys v0.35.0
golang.org/x/term v0.34.0
+ golang.org/x/time v0.12.0
google.golang.org/grpc v1.74.2
google.golang.org/protobuf v1.36.7
gopkg.in/yaml.v3 v3.0.1
helm.sh/helm/v3 v3.18.6
istio.io/api v1.26.0-alpha.0.0.20250908200844-f7a34ed800ee
- istio.io/client-go v1.26.0-alpha.0.0.20250908201345-99e026bfe54f
k8s.io/api v0.33.4
k8s.io/apiextensions-apiserver v0.33.4
k8s.io/apimachinery v0.33.4
@@ -239,7 +240,6 @@ require (
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/text v0.28.0 // indirect
- golang.org/x/time v0.12.0 // indirect
google.golang.org/genproto/googleapis/api
v0.0.0-20250811230008-5f3141c8851a // indirect
google.golang.org/genproto/googleapis/rpc
v0.0.0-20250811230008-5f3141c8851a // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
diff --git a/go.sum b/go.sum
index bfe17708..310ea56b 100644
--- a/go.sum
+++ b/go.sum
@@ -201,6 +201,8 @@ github.com/containers/image/v5 v5.34.0
h1:HPqQaDUsox/3mC1pbOyLAIQEp0JhQqiUZ+6JiF
github.com/containers/image/v5 v5.34.0/go.mod
h1:/WnvUSEfdqC/ahMRd4YJDBLrpYWkGl018rB77iB3FDo=
github.com/containers/storage v1.57.1
h1:hKPoFsuBcB3qTzBxa4IFpZMRzUuL5Xhv/BE44W0XHx8=
github.com/containers/storage v1.57.1/go.mod
h1:i/Hb4lu7YgFr9G0K6BMjqW0BLJO1sFsnWQwj2UoWCUM=
+github.com/coreos/go-oidc/v3 v3.12.0
h1:sJk+8G2qq94rDI6ehZ71Bol3oUHy63qNYmkiSjrc/Jo=
+github.com/coreos/go-oidc/v3 v3.12.0/go.mod
h1:gE3LgjOgFoHi9a4ce4/tJczr0Ai2/BoDhf0r5lltWI0=
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod
h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
github.com/creack/pty v1.1.17/go.mod
h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
@@ -868,8 +870,6 @@ honnef.co/go/tools
v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
istio.io/api v1.26.0-alpha.0.0.20250908200844-f7a34ed800ee
h1:pDLJDbgzc69hw3EQHIhO+j/efHketfwARg3eL+QIqss=
istio.io/api v1.26.0-alpha.0.0.20250908200844-f7a34ed800ee/go.mod
h1:BD3qv/ekm16kvSgvSpuiDawgKhEwG97wx849CednJSg=
-istio.io/client-go v1.26.0-alpha.0.0.20250908201345-99e026bfe54f
h1:gTRu3JTMDWZ2Qi7RcYIHXK2PPmiCtRAZ47QOiSooCxM=
-istio.io/client-go v1.26.0-alpha.0.0.20250908201345-99e026bfe54f/go.mod
h1:k9fittgsfNR2AkUGLYRkEyTbSa1marHI4oqsGkE4AEI=
k8s.io/api v0.33.4 h1:oTzrFVNPXBjMu0IlpA2eDDIU49jsuEorGHB4cvKupkk=
k8s.io/api v0.33.4/go.mod h1:VHQZ4cuxQ9sCUMESJV5+Fe8bGnqAARZ08tSTdHWfeAc=
k8s.io/apiextensions-apiserver v0.33.4
h1:rtq5SeXiDbXmSwxsF0MLe2Mtv3SwprA6wp+5qh/CrOU=
diff --git a/pkg/cmd/cmd.go b/pkg/cmd/cmd.go
index 0d9d4719..5d216b66 100644
--- a/pkg/cmd/cmd.go
+++ b/pkg/cmd/cmd.go
@@ -38,6 +38,13 @@ func PrintFlags(flags *pflag.FlagSet) {
})
}
+func WaitSignal(stop chan struct{}) {
+ sigs := make(chan os.Signal, 1)
+ signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
+ <-sigs
+ close(stop)
+}
+
func WaitSignalFunc(cancel context.CancelCauseFunc) {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
diff --git a/pkg/kube/client.go b/pkg/kube/client.go
index 638db0f4..56f77b3c 100644
--- a/pkg/kube/client.go
+++ b/pkg/kube/client.go
@@ -18,6 +18,7 @@
package kube
import (
+ "context"
"fmt"
"github.com/apache/dubbo-kubernetes/operator/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/cluster"
@@ -27,11 +28,11 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/lazy"
"github.com/apache/dubbo-kubernetes/pkg/sleep"
"go.uber.org/atomic"
- istioclient "istio.io/client-go/pkg/clientset/versioned"
kubeExtClient
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apimachinery/pkg/util/wait"
kubeVersion "k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
@@ -59,8 +60,9 @@ type client struct {
objectFilter kubetypes.DynamicObjectFilter
clusterID cluster.ID
informerWatchesPending *atomic.Int32
+ started atomic.Bool
crdWatcher kubetypes.CrdWatcher
- istio istioclient.Interface
+ fastSync bool
}
type Client interface {
@@ -82,6 +84,10 @@ type Client interface {
CrdWatcher() kubetypes.CrdWatcher
+ RunAndWait(stop <-chan struct{}) bool
+
+ WaitForCacheSync(name string, stop <-chan struct{}, cacheSyncs
...cache.InformerSynced) bool
+
Shutdown()
}
@@ -231,6 +237,64 @@ func (c *client) DynamicClientFor(gvk
schema.GroupVersionKind, obj *unstructured
return dr, nil
}
+func (c *client) WaitForCacheSync(name string, stop <-chan struct{},
cacheSyncs ...cache.InformerSynced) bool {
+ if c.informerWatchesPending == nil {
+ return WaitForCacheSync(name, stop, cacheSyncs...)
+ }
+ syncFns := append(cacheSyncs, func() bool {
+ return c.informerWatchesPending.Load() == 0
+ })
+ return WaitForCacheSync(name, stop, syncFns...)
+}
+
+func (c *client) Run(stop <-chan struct{}) {
+ c.informerFactory.Start(stop)
+ if c.crdWatcher != nil {
+ go c.crdWatcher.Run(stop)
+ }
+ alreadyStarted := c.started.Swap(true)
+ if alreadyStarted {
+ klog.V(2).Infof("cluster %q kube client started again",
c.clusterID)
+ } else {
+ klog.Infof("cluster %q kube client started", c.clusterID)
+ }
+}
+
+func (c *client) RunAndWait(stop <-chan struct{}) bool {
+ c.Run(stop)
+ if c.fastSync {
+ if c.crdWatcher != nil {
+ if !c.WaitForCacheSync("crd watcher", stop,
c.crdWatcher.HasSynced) {
+ return false
+ }
+ }
+ // WaitForCacheSync will virtually never be synced on the first
call, as its called immediately after Start()
+ // This triggers a 100ms delay per call, which is often called
2-3 times in a test, delaying tests.
+ // Instead, we add an aggressive sync polling
+ if !fastWaitForCacheSync(stop, c.informerFactory) {
+ return false
+ }
+ err := wait.PollUntilContextTimeout(context.Background(),
time.Microsecond*100, wait.ForeverTestTimeout, true, func(ctx context.Context)
(bool, error) {
+ select {
+ case <-stop:
+ return false, fmt.Errorf("channel closed")
+ default:
+ }
+ if c.informerWatchesPending.Load() == 0 {
+ return true, nil
+ }
+ return false, nil
+ })
+ return err == nil
+ }
+ if c.crdWatcher != nil {
+ if !c.WaitForCacheSync("crd watcher", stop,
c.crdWatcher.HasSynced) {
+ return false
+ }
+ }
+ return c.informerFactory.WaitForCacheSync(stop)
+}
+
func (c *client) bestEffortToGVR(gvk schema.GroupVersionKind, obj
*unstructured.Unstructured, namespace string) (schema.GroupVersionResource,
bool) {
if s, f :=
collections.All.FindByGroupVersionAliasesKind(config.FromKubernetesGVK(gvk)); f
{
gvr := s.GroupVersionResource()
@@ -294,6 +358,20 @@ func WaitForCacheSync(name string, stop <-chan struct{},
cacheSyncs ...cache.Inf
}
}
+func fastWaitForCacheSync(stop <-chan struct{}, informerFactory
informerfactory.InformerFactory) bool {
+ returnImmediately := make(chan struct{})
+ close(returnImmediately)
+ err := wait.PollUntilContextTimeout(context.Background(),
time.Microsecond*100, wait.ForeverTestTimeout, true, func(context.Context)
(bool, error) {
+ select {
+ case <-stop:
+ return false, fmt.Errorf("channel closed")
+ default:
+ }
+ return informerFactory.WaitForCacheSync(returnImmediately), nil
+ })
+ return err == nil
+}
+
func (c *client) Shutdown() {
c.informerFactory.Shutdown()
}
diff --git a/pkg/security/security.go b/pkg/security/security.go
index bcf5ac08..4476e0be 100644
--- a/pkg/security/security.go
+++ b/pkg/security/security.go
@@ -19,9 +19,12 @@ package security
import (
"context"
+ "fmt"
+ "google.golang.org/grpc/metadata"
"net/http"
"os"
"path/filepath"
+ "strings"
"time"
)
@@ -42,14 +45,14 @@ const (
)
const (
- CertSigner = "CertSigner"
+ BearerTokenPrefix = "Bearer "
+ K8sTokenPrefix = "Dubbo "
+ CertSigner = "CertSigner"
)
type AuthContext struct {
- // grpc context
GrpcContext context.Context
- // http request
- Request *http.Request
+ Request *http.Request
}
type Authenticator interface {
@@ -57,7 +60,6 @@ type Authenticator interface {
AuthenticatorType() string
}
-// SecretItem is the cached item in in-memory secret store.
type SecretItem struct {
CertificateChain []byte
PrivateKey []byte
@@ -67,7 +69,6 @@ type SecretItem struct {
ExpireTime time.Time
}
-// SecretManager defines secrets management interface which is used by SDS.
type SecretManager interface {
GenerateSecret(resourceName string) (*SecretItem, error)
}
@@ -80,6 +81,15 @@ type SdsCertificateConfig struct {
type AuthSource int
+const (
+ AuthSourceClientCertificate AuthSource = iota
+ AuthSourceIDToken
+)
+
+const (
+ authorizationMeta = "authorization"
+)
+
type KubernetesInfo struct {
PodName string
PodNamespace string
@@ -147,3 +157,39 @@ func CheckWorkloadCertificate(certChainFilePath,
keyFilePath, rootCertFilePath s
}
return true
}
+
+func ExtractBearerToken(ctx context.Context) (string, error) {
+ md, ok := metadata.FromIncomingContext(ctx)
+ if !ok {
+ return "", fmt.Errorf("no metadata is attached")
+ }
+
+ authHeader, exists := md[authorizationMeta]
+ if !exists {
+ return "", fmt.Errorf("no HTTP authorization header exists")
+ }
+
+ for _, value := range authHeader {
+ if strings.HasPrefix(value, BearerTokenPrefix) {
+ return strings.TrimPrefix(value, BearerTokenPrefix), nil
+ }
+ }
+
+ return "", fmt.Errorf("no bearer token exists in HTTP authorization
header")
+}
+
+func ExtractRequestToken(req *http.Request) (string, error) {
+ value := req.Header.Get(authorizationMeta)
+ if value == "" {
+ return "", fmt.Errorf("no HTTP authorization header exists")
+ }
+
+ if strings.HasPrefix(value, BearerTokenPrefix) {
+ return strings.TrimPrefix(value, BearerTokenPrefix), nil
+ }
+ if strings.HasPrefix(value, K8sTokenPrefix) {
+ return strings.TrimPrefix(value, K8sTokenPrefix), nil
+ }
+
+ return "", fmt.Errorf("no bearer token exists in HTTP authorization
header")
+}
diff --git a/sail/cmd/sail-discovery/app/cmd.go
b/sail/cmd/sail-discovery/app/cmd.go
index 09e29571..12eb57bb 100644
--- a/sail/cmd/sail-discovery/app/cmd.go
+++ b/sail/cmd/sail-discovery/app/cmd.go
@@ -78,6 +78,9 @@ func newDiscoveryCommand() *cobra.Command {
if err := discoveryServer.Start(stop); err != nil {
return fmt.Errorf("failed to start discovery
service: %v", err)
}
+
+ cmd.WaitSignal(stop)
+
return nil
},
}
diff --git a/sail/pkg/bootstrap/ca.go b/sail/pkg/bootstrap/ca.go
index 0ad04100..6b88a03e 100644
--- a/sail/pkg/bootstrap/ca.go
+++ b/sail/pkg/bootstrap/ca.go
@@ -20,21 +20,28 @@ package bootstrap
import (
"bytes"
"context"
+ "encoding/json"
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/config/constants"
"github.com/apache/dubbo-kubernetes/pkg/env"
"github.com/apache/dubbo-kubernetes/pkg/security"
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
+ securityModel
"github.com/apache/dubbo-kubernetes/sail/pkg/security/model"
"github.com/apache/dubbo-kubernetes/security/pkg/cmd"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/ra"
caserver "github.com/apache/dubbo-kubernetes/security/pkg/server/ca"
+ "github.com/apache/dubbo-kubernetes/security/pkg/server/ca/authenticate"
+ "github.com/apache/dubbo-kubernetes/security/pkg/util"
"github.com/fsnotify/fsnotify"
+ "google.golang.org/grpc"
+ "istio.io/api/security/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"os"
"path"
+ "strings"
"time"
)
@@ -49,6 +56,10 @@ type caOptions struct {
}
var (
+ trustedIssuer = env.Register("TOKEN_ISSUER", "",
+ "OIDC token issuer. If set, will be used to check the tokens.")
+ audience = env.Register("AUDIENCE", "",
+ "Expected audience in the tokens. ")
LocalCertDir = env.Register("ROOT_CA_DIR", "./etc/cacerts",
"Location of a local or mounted CA root")
useRemoteCerts = env.Register("USE_REMOTE_CERTS", false,
@@ -59,6 +70,8 @@ var (
// TODO: Likely to be removed and added to mesh config
k8sSigner = env.Register("K8S_SIGNER", "",
"Kubernetes CA Signer type. Valid from Kubernetes 1.18").Get()
+ k8sInCluster = env.Register("KUBERNETES_SERVICE_HOST", "",
+ "Kubernetes service host, set automatically when running
in-cluster")
workloadCertTTL = env.Register("DEFAULT_WORKLOAD_CERT_TTL",
cmd.DefaultWorkloadCertTTL,
"The default TTL of issued workload certificates. Applied when
the client sets a "+
@@ -88,6 +101,55 @@ var (
"rotation. This interval is suggested to be larger than
10 minutes.")
)
+func (s *Server) initCAServer(ca caserver.CertificateAuthority, opts
*caOptions) {
+ caServer, startErr := caserver.New(ca, maxWorkloadCertTTL.Get(),
opts.Authenticators)
+ if startErr != nil {
+ klog.Errorf("failed to create dubbo ca server: %v", startErr)
+ }
+ s.caServer = caServer
+}
+
+func (s *Server) RunCA(grpc *grpc.Server) {
+ iss := trustedIssuer.Get()
+ aud := audience.Get()
+
+ token, err := os.ReadFile(securityModel.ThirdPartyJwtPath)
+ if err == nil {
+ tok, err := detectAuthEnv(string(token))
+ if err != nil {
+ klog.Warningf("Starting with invalid K8S JWT token:
%v", err)
+ } else {
+ if iss == "" {
+ iss = tok.Iss
+ }
+ if len(tok.Aud) > 0 && len(aud) == 0 {
+ aud = tok.Aud[0]
+ }
+ }
+ }
+
+ // TODO: if not set, parse Istiod's own token (if present) and get the
issuer. The same issuer is used
+ // for all tokens - no need to configure twice. The token may also
include cluster info to auto-configure
+ // networking properties.
+ if iss != "" && // issuer set explicitly or extracted from our own JWT
+ k8sInCluster.Get() == "" { // not running in cluster - in
cluster use direct call to apiserver
+ // Add a custom authenticator using standard JWT validation, if
not running in K8S
+ // When running inside K8S - we can use the built-in validator,
which also check pod removal (invalidation).
+ jwtRule := v1beta1.JWTRule{Issuer: iss, Audiences:
[]string{aud}}
+ oidcAuth, err := authenticate.NewJwtAuthenticator(&jwtRule, nil)
+ if err == nil {
+ s.caServer.Authenticators =
append(s.caServer.Authenticators, oidcAuth)
+ klog.Info("Using out-of-cluster JWT authentication")
+ } else {
+ klog.Info("K8S token doesn't support OIDC, using only
in-cluster auth")
+ }
+ }
+
+ s.caServer.Register(grpc)
+
+ klog.Info("Dubbod CA has started")
+}
+
func (s *Server) loadCACerts(caOpts *caOptions, dir string) error {
if s.kubeClient == nil {
return nil
@@ -313,6 +375,27 @@ func (s *Server) handleCACertsFileWatch() {
}
}
+func detectAuthEnv(jwt string) (*authenticate.JwtPayload, error) {
+ jwtSplit := strings.Split(jwt, ".")
+ if len(jwtSplit) != 3 {
+ return nil, fmt.Errorf("invalid JWT parts: %s", jwt)
+ }
+ payload := jwtSplit[1]
+
+ payloadBytes, err := util.DecodeJwtPart(payload)
+ if err != nil {
+ return nil, fmt.Errorf("failed to decode jwt: %v", err.Error())
+ }
+
+ structuredPayload := &authenticate.JwtPayload{}
+ err = json.Unmarshal(payloadBytes, &structuredPayload)
+ if err != nil {
+ return nil, fmt.Errorf("failed to unmarshal jwt: %v",
err.Error())
+ }
+
+ return structuredPayload, nil
+}
+
func handleEvent(s *Server) {
klog.Info("Update Dubbod cacerts")
diff --git a/sail/pkg/bootstrap/options.go b/sail/pkg/bootstrap/options.go
index a45e9b11..e4facdc9 100644
--- a/sail/pkg/bootstrap/options.go
+++ b/sail/pkg/bootstrap/options.go
@@ -49,6 +49,7 @@ type SailArgs struct {
CtrlZOptions *ctrlz.Options
KeepaliveOptions *keepalive.Options
KrtDebugger *krt.DebugHandler `json:"-"`
+ JwtRule string
}
type DiscoveryServerOptions struct {
diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go
index 89e49947..fed67bdb 100644
--- a/sail/pkg/bootstrap/server.go
+++ b/sail/pkg/bootstrap/server.go
@@ -46,6 +46,7 @@ import (
"github.com/apache/dubbo-kubernetes/sail/pkg/xds"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/ra"
+ caserver "github.com/apache/dubbo-kubernetes/security/pkg/server/ca"
"github.com/fsnotify/fsnotify"
grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
"golang.org/x/net/http2"
@@ -100,12 +101,18 @@ type Server struct {
CA *ca.DubboCA
dnsNames []string
+ caServer *caserver.Server
+
certMu sync.RWMutex
dubbodCert *tls.Certificate
dubbodCertBundleWatcher *keycertbundle.Watcher
+
+ readinessProbes map[string]readinessProbe
}
+type readinessProbe func() bool
+
func NewServer(args *SailArgs, initFuncs ...func(*Server)) (*Server, error) {
e := model.NewEnvironment()
e.DomainSuffix = args.RegistryOptions.KubeOptions.DomainSuffix
@@ -199,6 +206,26 @@ func NewServer(args *SailArgs, initFuncs ...func(*Server))
(*Server, error) {
return nil, fmt.Errorf("error initializing secure gRPC
Listener: %v", err)
}
+ if s.kubeClient != nil {
+ s.initSecureWebhookServer(args)
+ if err := s.initConfigValidation(args); err != nil {
+ return nil, fmt.Errorf("error initializing config
validator: %v", err)
+ }
+ }
+
+ // TODO initRegistryEventHandlers?
+
+ // TODO initDiscoveryService?
+
+ s.startCA(caOpts)
+
+ if s.kubeClient != nil {
+ s.addStartFunc("kube client", func(stop <-chan struct{}) error {
+ s.kubeClient.RunAndWait(stop)
+ return nil
+ })
+ }
+
return s, nil
}
@@ -259,6 +286,29 @@ func (s *Server) Start(stop <-chan struct{}) error {
return nil
}
+func (s *Server) startCA(caOpts *caOptions) {
+ if s.CA == nil && s.RA == nil {
+ return
+ }
+ // init the RA server if configured, else start init CA server
+ if s.RA != nil {
+ klog.Infof("initializing CA server with RA")
+ s.initCAServer(s.RA, caOpts)
+ } else if s.CA != nil {
+ klog.Infof("initializing CA server with Dubbod CA")
+ s.initCAServer(s.CA, caOpts)
+ }
+ s.addStartFunc("ca", func(stop <-chan struct{}) error {
+ grpcServer := s.secureGrpcServer
+ if s.secureGrpcServer == nil {
+ grpcServer = s.grpcServer
+ }
+ klog.Infof("starting CA server")
+ s.RunCA(grpcServer)
+ return nil
+ })
+}
+
func (s *Server) initKubeClient(args *SailArgs) error {
if s.kubeClient != nil {
// Already initialized by startup arguments
@@ -677,6 +727,17 @@ func (s *Server) initDubbodCerts(args *SailArgs, host
string) error {
return err
}
+func (s *Server) dubbodReadyHandler(w http.ResponseWriter, _ *http.Request) {
+ for name, fn := range s.readinessProbes {
+ if ready := fn(); !ready {
+ klog.Warningf("%s is not ready", name)
+ w.WriteHeader(http.StatusServiceUnavailable)
+ return
+ }
+ }
+ w.WriteHeader(http.StatusOK)
+}
+
func getDNSNames(args *SailArgs, host string) []string {
// Append custom hostname if there is any
customHost := features.DubbodServiceCustomHost
diff --git a/sail/pkg/bootstrap/validation.go b/sail/pkg/bootstrap/validation.go
new file mode 100644
index 00000000..7f7fd73a
--- /dev/null
+++ b/sail/pkg/bootstrap/validation.go
@@ -0,0 +1,13 @@
+package bootstrap
+
+import (
+ "k8s.io/klog/v2"
+)
+
+func (s *Server) initConfigValidation(args *SailArgs) error {
+ if s.kubeClient == nil {
+ return nil
+ }
+ klog.Info("initializing config validator")
+ return nil
+}
diff --git a/sail/pkg/bootstrap/webhook.go b/sail/pkg/bootstrap/webhook.go
new file mode 100644
index 00000000..6fd29e77
--- /dev/null
+++ b/sail/pkg/bootstrap/webhook.go
@@ -0,0 +1,52 @@
+package bootstrap
+
+import (
+ "crypto/tls"
+ sec_model "github.com/apache/dubbo-kubernetes/pkg/model"
+ "k8s.io/klog/v2"
+ "log"
+ "net/http"
+ "strings"
+)
+
+type httpServerErrorLogWriter struct{}
+
+func (*httpServerErrorLogWriter) Write(p []byte) (int, error) {
+ m := strings.TrimSuffix(string(p), "\n")
+ if strings.HasPrefix(m, "http: TLS handshake error") &&
strings.HasSuffix(m, ": EOF") {
+ klog.V(2).Info(m)
+ } else {
+ klog.Info(m)
+ }
+ return len(p), nil
+}
+
+func (s *Server) initSecureWebhookServer(args *SailArgs) {
+ // create the https server for hosting the k8s injectionWebhook
handlers.
+ if args.ServerOptions.HTTPSAddr == "" {
+ s.httpsMux = s.httpMux
+ klog.Infof("HTTPS port is disabled, multiplexing webhooks on
the httpAddr %v", args.ServerOptions.HTTPAddr)
+ return
+ }
+
+ tlsConfig := &tls.Config{
+ GetCertificate: s.getDubbodCertificate,
+ MinVersion: tls.VersionTLS12,
+ CipherSuites: args.ServerOptions.TLSOptions.CipherSuits,
+ }
+ // Compliance for control plane validation and injection webhook server.
+ sec_model.EnforceGoCompliance(tlsConfig)
+
+ klog.Info("initializing secure webhook server for dubbod webhooks")
+ // create the https server for hosting the k8s injectionWebhook
handlers.
+ s.httpsMux = http.NewServeMux()
+ s.httpsServer = &http.Server{
+ Addr: args.ServerOptions.HTTPSAddr,
+ ErrorLog: log.New(&httpServerErrorLogWriter{}, "", 0),
+ Handler: s.httpsMux,
+ TLSConfig: tlsConfig,
+ }
+
+ // register istiodReadyHandler on the httpsMux so that readiness can
also be checked remotely
+ s.httpsMux.HandleFunc("/ready", s.dubbodReadyHandler)
+}
diff --git a/sail/pkg/model/xds_cache.go b/sail/pkg/model/xds_cache.go
index 7ebeee13..10e53ef6 100644
--- a/sail/pkg/model/xds_cache.go
+++ b/sail/pkg/model/xds_cache.go
@@ -21,7 +21,9 @@ import (
"github.com/apache/dubbo-kubernetes/sail/pkg/features"
)
-type XdsCache interface{}
+type XdsCache interface {
+ Run(stop <-chan struct{})
+}
type DisabledCache struct{}
@@ -44,3 +46,8 @@ func NewXdsCache() XdsCache {
return cache
}
+
+func (x XdsCacheImpl) Run(stop <-chan struct{}) {}
+
+func (d DisabledCache) Run(stop <-chan struct{}) {
+}
diff --git a/sail/pkg/security/model/authentication.go
b/sail/pkg/security/model/authentication.go
new file mode 100644
index 00000000..a4c135e5
--- /dev/null
+++ b/sail/pkg/security/model/authentication.go
@@ -0,0 +1,5 @@
+package model
+
+const (
+ ThirdPartyJwtPath = "/var/run/secrets/tokens/dubbo-token"
+)
diff --git a/sail/pkg/xds/discovery.go b/sail/pkg/xds/discovery.go
index a4c58ed8..57d9d4ba 100644
--- a/sail/pkg/xds/discovery.go
+++ b/sail/pkg/xds/discovery.go
@@ -29,25 +29,36 @@ import (
"time"
)
+type DebounceOptions struct {
+ DebounceAfter time.Duration
+ debounceMax time.Duration
+ enableEDSDebounce bool
+}
+
type DiscoveryServer struct {
- Env *model.Environment
- serverReady atomic.Bool
- DiscoveryStartTime time.Time
- ClusterAliases map[cluster.ID]cluster.ID
- Cache model.XdsCache
- pushQueue *PushQueue
- krtDebugger *krt.DebugHandler
- InboundUpdates *atomic.Int64
- CommittedUpdates *atomic.Int64
- RequestRateLimit *rate.Limiter
- ProxyNeedsPush func(proxy *model.Proxy, req *model.PushRequest)
(*model.PushRequest, bool)
+ Env *model.Environment
+ serverReady atomic.Bool
+ DiscoveryStartTime time.Time
+ ClusterAliases map[cluster.ID]cluster.ID
+ Cache model.XdsCache
+ pushQueue *PushQueue
+ krtDebugger *krt.DebugHandler
+ InboundUpdates *atomic.Int64
+ CommittedUpdates *atomic.Int64
+ RequestRateLimit *rate.Limiter
+ ProxyNeedsPush func(proxy *model.Proxy, req *model.PushRequest)
(*model.PushRequest, bool)
+ pushChannel chan *model.PushRequest
+ DebounceOptions DebounceOptions
+ concurrentPushLimit chan struct{}
}
func NewDiscoveryServer(env *model.Environment, clusterAliases
map[string]string, debugger *krt.DebugHandler) *DiscoveryServer {
out := &DiscoveryServer{
- Env: env,
- Cache: env.Cache,
- krtDebugger: debugger,
+ Env: env,
+ Cache: env.Cache,
+ krtDebugger: debugger,
+ InboundUpdates: atomic.NewInt64(0),
+ CommittedUpdates: atomic.NewInt64(0),
}
out.ClusterAliases = make(map[cluster.ID]cluster.ID)
for alias := range clusterAliases {
@@ -61,6 +72,12 @@ func (s *DiscoveryServer) Register(rpcs *grpc.Server) {
discovery.RegisterAggregatedDiscoveryServiceServer(rpcs, s)
}
+func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
+ go s.handleUpdates(stopCh)
+ go s.sendPushes(stopCh)
+ go s.Cache.Run(stopCh)
+}
+
func (s *DiscoveryServer) CachesSynced() {
klog.Infof("All caches have been synced up in %v, marking server
ready", time.Since(s.DiscoveryStartTime))
s.serverReady.Store(true)
@@ -70,6 +87,21 @@ func (s *DiscoveryServer) Shutdown() {
s.pushQueue.ShutDown()
}
+func (s *DiscoveryServer) Push(req *model.PushRequest) {}
+
func (s *DiscoveryServer) globalPushContext() *model.PushContext {
return s.Env.PushContext()
}
+
+func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) {
+ debounce(s.pushChannel, stopCh, s.DebounceOptions, s.Push,
s.CommittedUpdates)
+}
+
+func (s *DiscoveryServer) sendPushes(stopCh <-chan struct{}) {
+ doSendPushes(stopCh, s.concurrentPushLimit, s.pushQueue)
+}
+
+func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, opts
DebounceOptions, pushFn func(req *model.PushRequest), updateSent *atomic.Int64)
{
+}
+
+func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue
*PushQueue) {}
diff --git a/security/pkg/server/ca/authenticate/cert_authenticator.go
b/security/pkg/server/ca/authenticate/cert_authenticator.go
new file mode 100644
index 00000000..e40b2507
--- /dev/null
+++ b/security/pkg/server/ca/authenticate/cert_authenticator.go
@@ -0,0 +1,85 @@
+package authenticate
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/security"
+ "github.com/apache/dubbo-kubernetes/security/pkg/pki/util"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/peer"
+ "net/http"
+)
+
+const (
+ ClientCertAuthenticatorType = "ClientCertAuthenticator"
+)
+
+type ClientCertAuthenticator struct{}
+
+var _ security.Authenticator = &ClientCertAuthenticator{}
+
+func (cca *ClientCertAuthenticator) AuthenticatorType() string {
+ return ClientCertAuthenticatorType
+}
+
+// Authenticate extracts identities from presented client certificates. This
+// method assumes that certificate chain has been properly validated before
+// this method is called. In other words, this method does not do certificate
+// chain validation itself.
+func (cca *ClientCertAuthenticator) Authenticate(authCtx security.AuthContext)
(*security.Caller, error) {
+ if authCtx.GrpcContext != nil {
+ return cca.authenticateGrpc(authCtx.GrpcContext)
+ }
+ if authCtx.Request != nil {
+ return cca.authenticateHTTP(authCtx.Request)
+ }
+ return nil, nil
+}
+
+func (cca *ClientCertAuthenticator) authenticateGrpc(ctx context.Context)
(*security.Caller, error) {
+ peer, ok := peer.FromContext(ctx)
+ if !ok || peer.AuthInfo == nil {
+ return nil, fmt.Errorf("no client certificate is presented")
+ }
+
+ if authType := peer.AuthInfo.AuthType(); authType != "tls" {
+ return nil, fmt.Errorf("unsupported auth type: %q", authType)
+ }
+
+ tlsInfo := peer.AuthInfo.(credentials.TLSInfo)
+ chains := tlsInfo.State.VerifiedChains
+ if len(chains) == 0 || len(chains[0]) == 0 {
+ return nil, fmt.Errorf("no verified chain is found")
+ }
+
+ ids, err := util.ExtractIDs(chains[0][0].Extensions)
+ if err != nil {
+ return nil, err
+ }
+
+ return &security.Caller{
+ AuthSource: security.AuthSourceClientCertificate,
+ Identities: ids,
+ }, nil
+}
+
+func (cca *ClientCertAuthenticator) authenticateHTTP(req *http.Request)
(*security.Caller, error) {
+ if req.TLS == nil || req.TLS.VerifiedChains == nil {
+ return nil, fmt.Errorf("no client certificate is presented")
+ }
+
+ chains := req.TLS.VerifiedChains
+ if len(chains) == 0 || len(chains[0]) == 0 {
+ return nil, fmt.Errorf("no verified chain is found")
+ }
+
+ ids, err := util.ExtractIDs(chains[0][0].Extensions)
+ if err != nil {
+ return nil, err
+ }
+
+ return &security.Caller{
+ AuthSource: security.AuthSourceClientCertificate,
+ Identities: ids,
+ }, nil
+}
diff --git a/security/pkg/server/ca/authenticate/oidc.go
b/security/pkg/server/ca/authenticate/oidc.go
new file mode 100644
index 00000000..9cbb1d8d
--- /dev/null
+++ b/security/pkg/server/ca/authenticate/oidc.go
@@ -0,0 +1,117 @@
+package authenticate
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config/mesh"
+ "github.com/apache/dubbo-kubernetes/pkg/security"
+ "github.com/apache/dubbo-kubernetes/pkg/spiffe"
+ oidc "github.com/coreos/go-oidc/v3/oidc"
+ "istio.io/api/security/v1beta1"
+ "strings"
+)
+
+const (
+ IDTokenAuthenticatorType = "IDTokenAuthenticator"
+)
+
+type JwtPayload struct {
+ Aud []string `json:"aud"`
+ Exp int `json:"exp"`
+ Iss string `json:"iss"`
+ Sub string `json:"sub"`
+}
+
+type JwtAuthenticator struct {
+ // holder of a mesh configuration for dynamically updating trust domain
+ meshHolder mesh.Holder
+ audiences []string
+ verifier *oidc.IDTokenVerifier
+}
+
+func NewJwtAuthenticator(jwtRule *v1beta1.JWTRule, meshWatcher mesh.Watcher)
(*JwtAuthenticator, error) {
+ issuer := jwtRule.GetIssuer()
+ jwksURL := jwtRule.GetJwksUri()
+ // The key of a JWT issuer may change, so the key may need to be
updated.
+ // Based on
https://pkg.go.dev/github.com/coreos/go-oidc/v3/oidc#NewRemoteKeySet
+ // the oidc library handles caching and cache invalidation. Thus, the
verifier
+ // is only created once in the constructor.
+ var verifier *oidc.IDTokenVerifier
+ if len(jwksURL) == 0 {
+ // OIDC discovery is used if jwksURL is not set.
+ provider, err := oidc.NewProvider(context.Background(), issuer)
+ // OIDC discovery may fail, e.g. http request for the OIDC
server may fail.
+ if err != nil {
+ return nil, fmt.Errorf("failed at creating an OIDC
provider for %v: %v", issuer, err)
+ }
+ verifier = provider.Verifier(&oidc.Config{SkipClientIDCheck:
true})
+ } else {
+ keySet := oidc.NewRemoteKeySet(context.Background(), jwksURL)
+ verifier = oidc.NewVerifier(issuer, keySet,
&oidc.Config{SkipClientIDCheck: true})
+ }
+ return &JwtAuthenticator{
+ meshHolder: meshWatcher,
+ verifier: verifier,
+ audiences: jwtRule.Audiences,
+ }, nil
+}
+
+func (j *JwtAuthenticator) Authenticate(authRequest security.AuthContext)
(*security.Caller, error) {
+ if authRequest.GrpcContext != nil {
+ bearerToken, err :=
security.ExtractBearerToken(authRequest.GrpcContext)
+ if err != nil {
+ return nil, fmt.Errorf("ID token extraction error: %v",
err)
+ }
+ return j.authenticate(authRequest.GrpcContext, bearerToken)
+ }
+ if authRequest.Request != nil {
+ bearerToken, err :=
security.ExtractRequestToken(authRequest.Request)
+ if err != nil {
+ return nil, fmt.Errorf("target JWT extraction error:
%v", err)
+ }
+ return j.authenticate(authRequest.Request.Context(),
bearerToken)
+ }
+ return nil, nil
+}
+
+func (j *JwtAuthenticator) authenticate(ctx context.Context, bearerToken
string) (*security.Caller, error) {
+ idToken, err := j.verifier.Verify(ctx, bearerToken)
+ if err != nil {
+ return nil, fmt.Errorf("failed to verify the JWT token (error
%v)", err)
+ }
+
+ sa := JwtPayload{}
+ // "aud" for trust domain, "sub" has
"system:serviceaccount:$namespace:$serviceaccount".
+ // in future trust domain may use another field as a standard is
defined.
+ if err := idToken.Claims(&sa); err != nil {
+ return nil, fmt.Errorf("failed to extract claims from ID token:
%v", err)
+ }
+ if !strings.HasPrefix(sa.Sub, "system:serviceaccount") {
+ return nil, fmt.Errorf("invalid sub %v", sa.Sub)
+ }
+ parts := strings.Split(sa.Sub, ":")
+ ns := parts[2]
+ ksa := parts[3]
+ if !checkAudience(sa.Aud, j.audiences) {
+ return nil, fmt.Errorf("invalid audiences %v", sa.Aud)
+ }
+ return &security.Caller{
+ AuthSource: security.AuthSourceIDToken,
+ Identities:
[]string{spiffe.MustGenSpiffeURI(j.meshHolder.Mesh(), ns, ksa)},
+ }, nil
+}
+
+func (j JwtAuthenticator) AuthenticatorType() string {
+ return IDTokenAuthenticatorType
+}
+
+func checkAudience(audToCheck []string, audExpected []string) bool {
+ for _, a := range audToCheck {
+ for _, b := range audExpected {
+ if a == b {
+ return true
+ }
+ }
+ }
+ return false
+}
diff --git a/security/pkg/server/ca/server.go b/security/pkg/server/ca/server.go
index a8479613..cb5af79a 100644
--- a/security/pkg/server/ca/server.go
+++ b/security/pkg/server/ca/server.go
@@ -18,10 +18,36 @@
package ca
import (
+ "github.com/apache/dubbo-kubernetes/pkg/security"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/ca"
"github.com/apache/dubbo-kubernetes/security/pkg/pki/util"
+ "google.golang.org/grpc"
+ pb "istio.io/api/security/v1alpha1"
+ "time"
)
+type Server struct {
+ pb.UnimplementedIstioCertificateServiceServer
+ Authenticators []security.Authenticator
+ serverCertTTL time.Duration
+ ca CertificateAuthority
+}
+
+func New(ca CertificateAuthority, ttl time.Duration, authenticators
[]security.Authenticator) (*Server, error) {
+ certBundle := ca.GetCAKeyCertBundle()
+ if len(certBundle.GetRootCertPem()) != 0 {
+ RecordCertsExpiry(certBundle)
+ }
+
+ server := &Server{
+ Authenticators: authenticators,
+ serverCertTTL: ttl,
+ ca: ca,
+ }
+
+ return server, nil
+}
+
type CertificateAuthority interface {
Sign(csrPEM []byte, opts ca.CertOpts) ([]byte, error)
SignWithCertChain(csrPEM []byte, opts ca.CertOpts) ([]string, error)
@@ -29,3 +55,7 @@ type CertificateAuthority interface {
}
func RecordCertsExpiry(keyCertBundle *util.KeyCertBundle) {}
+
+func (s *Server) Register(grpcServer *grpc.Server) {
+ pb.RegisterIstioCertificateServiceServer(grpcServer, s)
+}
diff --git a/security/pkg/util/jwtutil.go b/security/pkg/util/jwtutil.go
new file mode 100644
index 00000000..5d56a554
--- /dev/null
+++ b/security/pkg/util/jwtutil.go
@@ -0,0 +1,14 @@
+package util
+
+import (
+ "encoding/base64"
+ "strings"
+)
+
+func DecodeJwtPart(seg string) ([]byte, error) {
+ if l := len(seg) % 4; l > 0 {
+ seg += strings.Repeat("=", 4-l)
+ }
+
+ return base64.URLEncoding.DecodeString(seg)
+}