This is an automated email from the ASF dual-hosted git repository.
zhongxjian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-kubernetes.git
The following commit(s) were added to refs/heads/master by this push:
new 08d54d47 [chore] fix webhook and crd controller (#799)
08d54d47 is described below
commit 08d54d47d5571603bc54d26ea25bdf8aa98708db
Author: Jian Zhong <[email protected]>
AuthorDate: Fri Oct 3 01:30:48 2025 +0800
[chore] fix webhook and crd controller (#799)
---
Dockerfile | 7 +
go.mod | 4 +-
go.sum | 8 +-
.../dubbo-discovery/files/grpc-agent.yaml | 73 ++-
pkg/config/constants/constants.go | 18 +-
pkg/config/mesh/mesh.go | 8 +-
pkg/config/schema/collections/collections.go | 80 +++-
pkg/config/schema/gvk/resources.go | 12 +
pkg/config/schema/gvr/resources.go | 6 +
pkg/config/schema/kubeclient/common.go | 3 +
.../kubeclient/{resource.gen.go => resource.go} | 19 +
pkg/config/schema/kubetypes/resources.go | 15 +
pkg/dubbo-agent/agent.go | 3 +-
pkg/dubbo-agent/config/config.go | 33 +-
pkg/jwt/jwt.go | 1 -
pkg/kube/client.go | 26 ++
pkg/kube/controllers/queue.go | 4 +
pkg/kube/kclient/client.go | 72 ++-
pkg/kube/kclient/crdwatcher.go | 120 +++++
pkg/kube/kclient/delayed.go | 194 ++++++++
pkg/kube/krt/map.go | 144 ++++++
pkg/kube/kubetypes/types.go | 42 ++
pkg/model/proxy.go | 39 --
pkg/security/security.go | 35 +-
pkg/webhooks/server/server.go | 237 ++++++++++
pkg/webhooks/validation/controller/controller.go | 189 ++++++++
pkg/xds/server.go | 3 +-
sail/cmd/sail-agent/app/cmd.go | 22 +-
sail/cmd/sail-agent/app/wait.go | 19 +-
sail/cmd/sail-agent/options/agent.go | 3 +
sail/cmd/sail-agent/options/agent_proxy.go | 1 -
sail/cmd/sail-agent/options/options.go | 15 +-
sail/cmd/sail-agent/options/security.go | 23 +-
sail/pkg/bootstrap/configcontroller.go | 16 +-
sail/pkg/bootstrap/{ca.go => dubbo_ca.go} | 6 -
sail/pkg/bootstrap/proxylessinjector.go | 15 +-
sail/pkg/bootstrap/server.go | 34 +-
sail/pkg/bootstrap/validation.go | 21 +
sail/pkg/config/kube/crd/config.go | 45 ++
sail/pkg/config/kube/crd/conversion.go | 80 +++-
sail/pkg/config/kube/crdclient/client.go | 360 +++++++++++++-
sail/pkg/config/kube/crdclient/types.go | 520 +++++++++++++++++++++
sail/pkg/config/kube/file/controller.go | 7 +-
sail/pkg/features/sail.go | 19 +-
sail/pkg/model/cluster_local.go | 110 ++++-
sail/pkg/model/config.go | 11 -
sail/pkg/model/context.go | 15 +
sail/pkg/model/network.go | 10 +
security/pkg/credentialfetcher/fetcher.go | 21 -
security/pkg/pki/ca/selfsignedcarootcertrotator.go | 2 +-
security/tools/generate_cert/main.go | 2 +-
51 files changed, 2490 insertions(+), 282 deletions(-)
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 00000000..310b911d
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,7 @@
+FROM gcr.io/distroless/static:debug AS discovery
+COPY sail-discovery .
+ENTRYPOINT ["./sail-discovery"]
+
+FROM gcr.io/distroless/static:debug AS agent
+COPY sail-agent .
+ENTRYPOINT ["./sail-agent"]
diff --git a/go.mod b/go.mod
index 29fc61a5..8691246f 100644
--- a/go.mod
+++ b/go.mod
@@ -34,6 +34,7 @@ require (
github.com/docker/docker-credential-helpers v0.9.3
github.com/docker/go-connections v0.5.0
github.com/envoyproxy/go-control-plane/envoy
v1.32.5-0.20250627145903-197b96a9c7f8
+ github.com/evanphx/json-patch/v5 v5.9.11
github.com/fatih/color v1.18.0
github.com/fsnotify/fsnotify v1.9.0
github.com/go-git/go-billy/v5 v5.6.2
@@ -67,7 +68,8 @@ require (
google.golang.org/protobuf v1.36.7
gopkg.in/yaml.v3 v3.0.1
helm.sh/helm/v3 v3.18.6
- istio.io/api v1.26.0-alpha.0.0.20250908200844-f7a34ed800ee
+ istio.io/api v1.27.1-0.20250820125923-f5a5d3a605a9
+ istio.io/client-go v1.27.1
k8s.io/api v0.33.4
k8s.io/apiextensions-apiserver v0.33.4
k8s.io/apimachinery v0.33.4
diff --git a/go.sum b/go.sum
index b49d760b..aa358fc3 100644
--- a/go.sum
+++ b/go.sum
@@ -257,6 +257,8 @@ github.com/envoyproxy/protoc-gen-validate v1.2.1
h1:DEo3O99U8j4hBFwbJfrz9VtgcDfU
github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod
h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
github.com/evanphx/json-patch v5.9.11+incompatible
h1:ixHHqfcGvxhWkniF1tWxBHA0yb4Z+d1UQi45df52xW8=
github.com/evanphx/json-patch v5.9.11+incompatible/go.mod
h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
+github.com/evanphx/json-patch/v5 v5.9.11
h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjTM0wiaDU=
+github.com/evanphx/json-patch/v5 v5.9.11/go.mod
h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM=
github.com/fatih/color v1.7.0/go.mod
h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
github.com/fatih/color v1.18.0/go.mod
h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
@@ -872,8 +874,10 @@ helm.sh/helm/v3 v3.18.6
h1:S/2CqcYnNfLckkHLI0VgQbxgcDaU3N4A/46E3n9wSNY=
helm.sh/helm/v3 v3.18.6/go.mod h1:L/dXDR2r539oPlFP1PJqKAC1CUgqHJDLkxKpDGrWnyg=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
-istio.io/api v1.26.0-alpha.0.0.20250908200844-f7a34ed800ee
h1:pDLJDbgzc69hw3EQHIhO+j/efHketfwARg3eL+QIqss=
-istio.io/api v1.26.0-alpha.0.0.20250908200844-f7a34ed800ee/go.mod
h1:BD3qv/ekm16kvSgvSpuiDawgKhEwG97wx849CednJSg=
+istio.io/api v1.27.1-0.20250820125923-f5a5d3a605a9
h1:gVTxnhYGJ1pY+iqcz/mrbPSpdkR9Z36srCz7TybrXGY=
+istio.io/api v1.27.1-0.20250820125923-f5a5d3a605a9/go.mod
h1:DTVGH6CLXj5W8FF9JUD3Tis78iRgT1WeuAnxfTz21Wg=
+istio.io/client-go v1.27.1 h1:VWEtOzmv9gi4x3OPjN5wMFOBV1i95UIGcbYXoP4VVuA=
+istio.io/client-go v1.27.1/go.mod
h1:otQns/CCDd4EoyEFWp8w+ksTP0T05baYTIx5FxqS8eM=
k8s.io/api v0.33.4 h1:oTzrFVNPXBjMu0IlpA2eDDIU49jsuEorGHB4cvKupkk=
k8s.io/api v0.33.4/go.mod h1:VHQZ4cuxQ9sCUMESJV5+Fe8bGnqAARZ08tSTdHWfeAc=
k8s.io/apiextensions-apiserver v0.33.4
h1:rtq5SeXiDbXmSwxsF0MLe2Mtv3SwprA6wp+5qh/CrOU=
diff --git
a/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
b/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
index 69b462ca..22215df1 100644
--- a/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
+++ b/manifests/charts/dubbo-control/dubbo-discovery/files/grpc-agent.yaml
@@ -1 +1,72 @@
-# inject grpc template
\ No newline at end of file
+metadata:
+ annotations: {}
+spec:
+ containers:
+ - name: dubbo-proxy
+ image: "proxyxds"
+ ports:
+ - containerPort: 15020
+ protocol: TCP
+ name: mesh-metrics
+ args:
+ - proxy
+ lifecycle:
+ postStart:
+ exec:
+ command:
+ - sail-agent
+ - wait
+ - --url=http://localhost:15020/healthz/ready
+ env:
+ - name: SAIL_META_GENERATOR
+ value: grpc
+ - name: OUTPUT_CERTS
+ value: /var/lib/dubbo/data
+ readinessProbe:
+ httpGet:
+ path: /healthz/ready
+ port: 15020
+ initialDelaySeconds: 0
+ periodSeconds: 15
+ timeoutSeconds: 3
+ failureThreshold: 4
+ resources:
+ requests:
+ cpu: 100m
+ memory: 128Mi
+ limits:
+ cpu: 2000m
+ memory: 1024Mi
+ volumeMounts:
+ - name: workload-socket
+ mountPath: /var/run/secrets/workload-spiffe-uds
+ - name: workload-certs
+ mountPath: /var/run/secrets/workload-spiffe-credentials
+ - name: dubbod-ca-cert
+ mountPath: /var/run/secrets/dubbo
+ - name: dubbo-data
+ mountPath: /var/lib/dubbo/data
+ # UDS channel between istioagent and gRPC client for XDS/SDS
+ - mountPath: /etc/dubbo/proxy
+ name: dubbo-xds
+ - mountPath: /var/run/secrets/tokens
+ name: dubbo-token
+ - mountPath: /etc/certs/
+ name: dubbo-certs
+ readOnly: true
+ - name: dubbo-podinfo
+ mountPath: /etc/dubbo/pod
+- name: dubbo-proxy
+ env:
+ - name: "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
+ value: "true"
+ - name: "GRPC_XDS_BOOTSTRAP"
+ value: "/etc/dubbo/proxy/grpc-bootstrap.json"
+ volumeMounts:
+ - mountPath: /var/lib/dubbo/data
+ name: dubbo-data
+ # UDS channel between dubboagent and gRPC client for XDS/SDS
+ - mountPath: /etc/dubbo/proxy
+ name: dubbo-xds
+ - mountPath: /var/run/secrets/workload-spiffe-credentials
+ name: workload-certs
\ No newline at end of file
diff --git a/pkg/config/constants/constants.go
b/pkg/config/constants/constants.go
index d558fafa..4c779b73 100644
--- a/pkg/config/constants/constants.go
+++ b/pkg/config/constants/constants.go
@@ -20,24 +20,20 @@ package constants
const (
DubboSystemNamespace = "dubbo-system"
DefaultClusterLocalDomain = "cluster.local"
- KeyFilename = "key.pem"
- CertChainFilename = "cert-chain.pem"
DefaultClusterName = "Kubernetes"
ServiceClusterName = "dubbo-proxy"
ConfigPathDir = "./etc/dubbo/proxy"
+ KeyFilename = "key.pem"
+ CertChainFilename = "cert-chain.pem"
CertProviderDubbod = "dubbod"
+ CertProviderKubernetes = "kubernetes"
CertProviderKubernetesSignerPrefix = "k8s.io/"
-
- CACertNamespaceConfigMapDataName = "root-cert.pem"
+ CertProviderNone = "none"
+ CertProviderCustom = "custom"
+ CACertNamespaceConfigMapDataName = "root-cert.pem"
PodInfoAnnotationsPath = "./etc/dubbo/pod/annotations"
- CertProviderNone = "none"
- CertProviderCustom = "custom"
-
- ThirdPartyJwtPath = "./var/run/secrets/tokens/dubbo-token"
-
- CertProviderKubernetes = "kubernetes"
SailWellKnownDNSCertPath = "./var/run/secrets/dubbod/tls/"
SailWellKnownDNSCaCertPath = "./var/run/secrets/dubbod/ca/"
@@ -46,4 +42,6 @@ const (
DefaultSailTLSKey = SailWellKnownDNSCertPath + "tls.key"
DefaultSailTLSCaCert = SailWellKnownDNSCaCertPath +
"root-cert.pem"
DefaultSailTLSCaCertAlternatePath = SailWellKnownDNSCertPath + "ca.crt"
+
+ AlwaysReject = "internal.dubbo.io/webhook-always-reject"
)
diff --git a/pkg/config/mesh/mesh.go b/pkg/config/mesh/mesh.go
index 3fdf4cf4..d869124a 100644
--- a/pkg/config/mesh/mesh.go
+++ b/pkg/config/mesh/mesh.go
@@ -46,12 +46,8 @@ func DefaultProxyConfig() *meshconfig.ProxyConfig {
ClusterName:
&meshconfig.ProxyConfig_ServiceCluster{ServiceCluster:
constants.ServiceClusterName},
DrainDuration: durationpb.New(45 * time.Second),
TerminationDrainDuration: durationpb.New(5 * time.Second),
- ProxyAdminPort: 15000,
- // TODO authpolicy
- DiscoveryAddress: "dubbod.dubbo-system.svc:15012",
- ControlPlaneAuthPolicy:
meshconfig.AuthenticationPolicy_MUTUAL_TLS,
- StatNameLength: 189,
- StatusPort: 15020,
+ DiscoveryAddress: "dubbod.dubbo-system.svc:15012",
+ ControlPlaneAuthPolicy:
meshconfig.AuthenticationPolicy_MUTUAL_TLS,
}
}
diff --git a/pkg/config/schema/collections/collections.go
b/pkg/config/schema/collections/collections.go
index d38deddd..a93de7cc 100644
--- a/pkg/config/schema/collections/collections.go
+++ b/pkg/config/schema/collections/collections.go
@@ -5,10 +5,88 @@ package collections
import (
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/resource"
+ istioioapimetav1alpha1 "istio.io/api/meta/v1alpha1"
+ istioioapinetworkingv1alpha3 "istio.io/api/networking/v1alpha3"
+ istioioapisecurityv1beta1 "istio.io/api/security/v1beta1"
+ "reflect"
)
var (
+ DestinationRule = resource.Builder{
+ Identifier: "DestinationRule",
+ Group: "networking.istio.io",
+ Kind: "DestinationRule",
+ Plural: "destinationrules",
+ Version: "v1",
+ VersionAliases: []string{
+ "v1alpha3",
+ "v1beta1",
+ },
+ Proto: "istio.networking.v1alpha3.DestinationRule",
StatusProto: "istio.meta.v1alpha1.IstioStatus",
+ ReflectType:
reflect.TypeOf(&istioioapinetworkingv1alpha3.DestinationRule{}).Elem(),
StatusType: reflect.TypeOf(&istioioapimetav1alpha1.IstioStatus{}).Elem(),
+ ProtoPackage: "istio.io/api/networking/v1alpha3",
StatusPackage: "istio.io/api/meta/v1alpha1",
+ ClusterScoped: false,
+ Synthetic: false,
+ Builtin: false,
+ }.MustBuild()
+
+ PeerAuthentication = resource.Builder{
+ Identifier: "PeerAuthentication",
+ Group: "security.istio.io",
+ Kind: "PeerAuthentication",
+ Plural: "peerauthentications",
+ Version: "v1",
+ VersionAliases: []string{
+ "v1beta1",
+ },
+ Proto: "istio.security.v1beta1.PeerAuthentication",
StatusProto: "istio.meta.v1alpha1.IstioStatus",
+ ReflectType:
reflect.TypeOf(&istioioapisecurityv1beta1.PeerAuthentication{}).Elem(),
StatusType: reflect.TypeOf(&istioioapimetav1alpha1.IstioStatus{}).Elem(),
+ ProtoPackage: "istio.io/api/security/v1beta1", StatusPackage:
"istio.io/api/meta/v1alpha1",
+ ClusterScoped: false,
+ Synthetic: false,
+ Builtin: false,
+ }.MustBuild()
+
+ RequestAuthentication = resource.Builder{
+ Identifier: "RequestAuthentication",
+ Group: "security.istio.io",
+ Kind: "RequestAuthentication",
+ Plural: "requestauthentications",
+ Version: "v1",
+ VersionAliases: []string{
+ "v1beta1",
+ },
+ Proto: "istio.security.v1beta1.RequestAuthentication",
StatusProto: "istio.meta.v1alpha1.IstioStatus",
+ ReflectType:
reflect.TypeOf(&istioioapisecurityv1beta1.RequestAuthentication{}).Elem(),
StatusType: reflect.TypeOf(&istioioapimetav1alpha1.IstioStatus{}).Elem(),
+ ProtoPackage: "istio.io/api/security/v1beta1", StatusPackage:
"istio.io/api/meta/v1alpha1",
+ ClusterScoped: false,
+ Synthetic: false,
+ Builtin: false,
+ }.MustBuild()
+
+ VirtualService = resource.Builder{
+ Identifier: "VirtualService",
+ Group: "networking.istio.io",
+ Kind: "VirtualService",
+ Plural: "virtualservices",
+ Version: "v1",
+ VersionAliases: []string{
+ "v1alpha3",
+ "v1beta1",
+ },
+ Proto: "istio.networking.v1alpha3.VirtualService", StatusProto:
"istio.meta.v1alpha1.IstioStatus",
+ ReflectType:
reflect.TypeOf(&istioioapinetworkingv1alpha3.VirtualService{}).Elem(),
StatusType: reflect.TypeOf(&istioioapimetav1alpha1.IstioStatus{}).Elem(),
+ ProtoPackage: "istio.io/api/networking/v1alpha3",
StatusPackage: "istio.io/api/meta/v1alpha1",
+ ClusterScoped: false,
+ Synthetic: false,
+ Builtin: false,
+ }.MustBuild()
+
Sail = collection.NewSchemasBuilder().
- // TODO MustAdd
+ MustAdd(PeerAuthentication).
+ MustAdd(RequestAuthentication).
+ MustAdd(VirtualService).
+ MustAdd(DestinationRule).
Build()
)
diff --git a/pkg/config/schema/gvk/resources.go
b/pkg/config/schema/gvk/resources.go
index 1e5d9e43..a610009d 100644
--- a/pkg/config/schema/gvk/resources.go
+++ b/pkg/config/schema/gvk/resources.go
@@ -40,6 +40,8 @@ var (
RequestAuthentication = config.GroupVersionKind{Group:
"security.dubbo.io", Version: "v1", Kind: "RequestAuthentication"}
PeerAuthentication = config.GroupVersionKind{Group:
"security.dubbo.io", Version: "v1", Kind: "PeerAuthentication"}
AuthorizationPolicy = config.GroupVersionKind{Group:
"security.dubbo.io", Version: "v1", Kind: "AuthorizationPolicy"}
+ DestinationRule = config.GroupVersionKind{Group:
"networking.dubbo.io", Version: "v1", Kind: "DestinationRule"}
+ VirtualService = config.GroupVersionKind{Group:
"networking.dubbo.io", Version: "v1", Kind: "VirtualService"}
)
func ToGVR(g config.GroupVersionKind) (schema.GroupVersionResource, bool) {
@@ -76,6 +78,10 @@ func ToGVR(g config.GroupVersionKind)
(schema.GroupVersionResource, bool) {
return gvr.PeerAuthentication, true
case AuthorizationPolicy:
return gvr.AuthorizationPolicy, true
+ case DestinationRule:
+ return gvr.DestinationRule, true
+ case VirtualService:
+ return gvr.VirtualService, true
}
return schema.GroupVersionResource{}, false
}
@@ -105,6 +111,12 @@ func FromGVR(g schema.GroupVersionResource)
(config.GroupVersionKind, bool) {
return PeerAuthentication, true
case gvr.RequestAuthentication:
return RequestAuthentication, true
+ case gvr.AuthorizationPolicy:
+ return AuthorizationPolicy, true
+ case gvr.VirtualService:
+ return VirtualService, true
+ case gvr.DestinationRule:
+ return DestinationRule, true
}
return config.GroupVersionKind{}, false
}
diff --git a/pkg/config/schema/gvr/resources.go
b/pkg/config/schema/gvr/resources.go
index a257a6a6..ba4c933e 100644
--- a/pkg/config/schema/gvr/resources.go
+++ b/pkg/config/schema/gvr/resources.go
@@ -38,6 +38,8 @@ var (
RequestAuthentication = schema.GroupVersionResource{Group:
"security.dubbo.io", Version: "v1", Resource: "requestauthentications"}
PeerAuthentication = schema.GroupVersionResource{Group:
"security.dubbo.io", Version: "v1", Resource: "peerauthentications"}
AuthorizationPolicy = schema.GroupVersionResource{Group:
"security.dubbo.io", Version: "v1", Resource: "authorizationpolicies"}
+ DestinationRule = schema.GroupVersionResource{Group:
"networking.dubbo.io", Version: "v1", Resource: "destinationrules"}
+ VirtualService = schema.GroupVersionResource{Group:
"networking.dubbo.io", Version: "v1", Resource: "virtualservices"}
)
func IsClusterScoped(g schema.GroupVersionResource) bool {
@@ -64,6 +66,10 @@ func IsClusterScoped(g schema.GroupVersionResource) bool {
return false
case AuthorizationPolicy:
return false
+ case DestinationRule:
+ return false
+ case VirtualService:
+ return false
}
return false
}
diff --git a/pkg/config/schema/kubeclient/common.go
b/pkg/config/schema/kubeclient/common.go
index 81da01fc..01ef6e3d 100644
--- a/pkg/config/schema/kubeclient/common.go
+++ b/pkg/config/schema/kubeclient/common.go
@@ -23,6 +23,7 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/kube/informerfactory"
ktypes "github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
"github.com/apache/dubbo-kubernetes/pkg/typemap"
+ istioclient "istio.io/client-go/pkg/clientset/versioned"
kubeext "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -45,6 +46,8 @@ type ClientGetter interface {
// Dynamic client.
Dynamic() dynamic.Interface
+ Dubbo() istioclient.Interface
+
// Metadata returns the Metadata kube client.
Metadata() metadata.Interface
diff --git a/pkg/config/schema/kubeclient/resource.gen.go
b/pkg/config/schema/kubeclient/resource.go
similarity index 90%
rename from pkg/config/schema/kubeclient/resource.gen.go
rename to pkg/config/schema/kubeclient/resource.go
index fdba2adc..12a9d02b 100644
--- a/pkg/config/schema/kubeclient/resource.gen.go
+++ b/pkg/config/schema/kubeclient/resource.go
@@ -24,6 +24,8 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/kube/informerfactory"
ktypes "github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
"github.com/apache/dubbo-kubernetes/pkg/ptr"
+ apiistioioapinetworkingv1 "istio.io/client-go/pkg/apis/networking/v1"
+ apiistioioapisecurityv1 "istio.io/client-go/pkg/apis/security/v1"
k8sioapiadmissionregistrationv1 "k8s.io/api/admissionregistration/v1"
k8sioapiappsv1 "k8s.io/api/apps/v1"
k8sioapicertificatesv1 "k8s.io/api/certificates/v1"
@@ -78,6 +80,15 @@ func GetWriteClient[T runtime.Object](c ClientGetter,
namespace string) ktypes.W
return
c.Kube().CoreV1().Services(namespace).(ktypes.WriteAPI[T])
case *k8sioapicorev1.ServiceAccount:
return
c.Kube().CoreV1().ServiceAccounts(namespace).(ktypes.WriteAPI[T])
+ case *apiistioioapisecurityv1.RequestAuthentication:
+ return
c.Dubbo().SecurityV1().RequestAuthentications(namespace).(ktypes.WriteAPI[T])
+ case *apiistioioapisecurityv1.PeerAuthentication:
+ return
c.Dubbo().SecurityV1().PeerAuthentications(namespace).(ktypes.WriteAPI[T])
+ case *apiistioioapinetworkingv1.VirtualService:
+ return
c.Dubbo().NetworkingV1().VirtualServices(namespace).(ktypes.WriteAPI[T])
+ case *apiistioioapinetworkingv1.DestinationRule:
+ return
c.Dubbo().NetworkingV1().DestinationRules(namespace).(ktypes.WriteAPI[T])
+
default:
panic(fmt.Sprintf("Unknown type %T", ptr.Empty[T]()))
}
@@ -107,6 +118,14 @@ func gvrToObject(g schema.GroupVersionResource)
runtime.Object {
return &k8sioapiappsv1.StatefulSet{}
case gvr.ValidatingWebhookConfiguration:
return
&k8sioapiadmissionregistrationv1.ValidatingWebhookConfiguration{}
+ case gvr.RequestAuthentication:
+ return &apiistioioapisecurityv1.RequestAuthentication{}
+ case gvr.PeerAuthentication:
+ return &apiistioioapisecurityv1.PeerAuthentication{}
+ case gvr.VirtualService:
+ return &apiistioioapinetworkingv1.VirtualService{}
+ case gvr.DestinationRule:
+ return &apiistioioapinetworkingv1.DestinationRule{}
default:
panic(fmt.Sprintf("Unknown type %v", g))
}
diff --git a/pkg/config/schema/kubetypes/resources.go
b/pkg/config/schema/kubetypes/resources.go
index 7090e191..29962b0f 100644
--- a/pkg/config/schema/kubetypes/resources.go
+++ b/pkg/config/schema/kubetypes/resources.go
@@ -21,17 +21,32 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
istioioapimeshv1alpha1 "istio.io/api/mesh/v1alpha1"
+ apiistioioapinetworkingv1 "istio.io/client-go/pkg/apis/networking/v1"
+ apiistioioapisecurityv1 "istio.io/client-go/pkg/apis/security/v1"
+ k8sioapiadmissionregistrationv1 "k8s.io/api/admissionregistration/v1"
k8sioapicorev1 "k8s.io/api/core/v1"
)
func getGvk(obj any) (config.GroupVersionKind, bool) {
switch obj.(type) {
+ case *apiistioioapisecurityv1.AuthorizationPolicy:
+ return gvk.AuthorizationPolicy, true
+ case *apiistioioapisecurityv1.PeerAuthentication:
+ return gvk.PeerAuthentication, true
+ case *apiistioioapisecurityv1.RequestAuthentication:
+ return gvk.RequestAuthentication, true
+ case *apiistioioapinetworkingv1.DestinationRule:
+ return gvk.DestinationRule, true
+ case *apiistioioapinetworkingv1.VirtualService:
+ return gvk.VirtualService, true
case *k8sioapicorev1.ConfigMap:
return gvk.ConfigMap, true
case *istioioapimeshv1alpha1.MeshConfig:
return gvk.MeshConfig, true
case *k8sioapicorev1.Namespace:
return gvk.Namespace, true
+ case *k8sioapiadmissionregistrationv1.ValidatingWebhookConfiguration:
+ return gvk.ValidatingWebhookConfiguration, true
default:
return config.GroupVersionKind{}, false
}
diff --git a/pkg/dubbo-agent/agent.go b/pkg/dubbo-agent/agent.go
index 409c57f7..04e44f51 100644
--- a/pkg/dubbo-agent/agent.go
+++ b/pkg/dubbo-agent/agent.go
@@ -24,7 +24,6 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/config/constants"
"github.com/apache/dubbo-kubernetes/pkg/dubbo-agent/grpcxds"
"github.com/apache/dubbo-kubernetes/pkg/filewatcher"
- "github.com/apache/dubbo-kubernetes/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/security"
"github.com/apache/dubbo-kubernetes/security/pkg/nodeagent/cache"
"google.golang.org/grpc"
@@ -51,7 +50,6 @@ const (
type SDSServiceFactory = func(_ *security.Options, _ security.SecretManager, _
*mesh.PrivateKeyProvider) SDSService
type Proxy struct {
- Type model.NodeType
DNSDomain string
}
@@ -75,6 +73,7 @@ type AgentOptions struct {
XDSRootCerts string
MetadataDiscovery *bool
CARootCerts string
+ SDSFactory func(options *security.Options,
workloadSecretCache security.SecretManager, pkpConf *mesh.PrivateKeyProvider)
SDSService
}
func NewAgent(proxyConfig *mesh.ProxyConfig, agentOpts *AgentOptions, sopts
*security.Options) *Agent {
diff --git a/pkg/dubbo-agent/config/config.go b/pkg/dubbo-agent/config/config.go
index 1bd21a36..d215276b 100644
--- a/pkg/dubbo-agent/config/config.go
+++ b/pkg/dubbo-agent/config/config.go
@@ -5,18 +5,16 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/bootstrap"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
"github.com/apache/dubbo-kubernetes/pkg/env"
- "google.golang.org/protobuf/types/known/wrapperspb"
"istio.io/api/annotation"
meshconfig "istio.io/api/mesh/v1alpha1"
"k8s.io/klog/v2"
"os"
- "runtime"
"strconv"
"strings"
)
// ConstructProxyConfig returns proxyConfig
-func ConstructProxyConfig(meshConfigFile, serviceCluster, proxyConfigEnv
string, concurrency int) (*meshconfig.ProxyConfig, error) {
+func ConstructProxyConfig(meshConfigFile, proxyConfigEnv string)
(*meshconfig.ProxyConfig, error) {
annotations, err := bootstrap.ReadPodAnnotations("")
if err != nil {
if os.IsNotExist(err) {
@@ -41,35 +39,6 @@ func ConstructProxyConfig(meshConfigFile, serviceCluster,
proxyConfigEnv string,
if meshConfig.DefaultConfig != nil {
proxyConfig = meshConfig.DefaultConfig
}
-
- // Concurrency wasn't explicitly set
- if proxyConfig.Concurrency == nil {
- // We want to detect based on CPU limit configured. If we are
running on a 100 core machine, but with
- // only 2 CPUs allocated, we want to have 2 threads, not 100,
or we will get excessively throttled.
- if CPULimit != 0 {
- klog.Infof("cpu limit detected as %v, setting
concurrency", CPULimit)
- proxyConfig.Concurrency =
wrapperspb.Int32(int32(CPULimit))
- }
- }
- // Respect the old flag, if they set it. This should never be set in
typical installation.
- if concurrency != 0 {
- klog.V(2).Infof("legacy --concurrency=%d flag detected; prefer
to use ProxyConfig", concurrency)
- proxyConfig.Concurrency = wrapperspb.Int32(int32(concurrency))
- }
-
- if proxyConfig.Concurrency.GetValue() == 0 {
- if CPULimit < runtime.NumCPU() {
- klog.V(2).Infof("concurrency is set to 0, which will
use a thread per CPU on the host. However, CPU limit is set lower. "+
- "This is not recommended and may lead to
performance issues. "+
- "CPU count: %d, CPU Limit: %d.",
runtime.NumCPU(), CPULimit)
- }
- }
-
- if x, ok :=
proxyConfig.GetClusterName().(*meshconfig.ProxyConfig_ServiceCluster); ok {
- if x.ServiceCluster == "" {
- proxyConfig.ClusterName =
&meshconfig.ProxyConfig_ServiceCluster{ServiceCluster: serviceCluster}
- }
- }
// TODO ResolveAddr
// TODO ValidateMeshConfigProxyConfig
return applyAnnotations(proxyConfig, annotations), nil
diff --git a/pkg/jwt/jwt.go b/pkg/jwt/jwt.go
index 9d27495b..670106d4 100644
--- a/pkg/jwt/jwt.go
+++ b/pkg/jwt/jwt.go
@@ -1,6 +1,5 @@
package jwt
const (
- PolicyThirdParty = "third-party-jwt"
PolicyFirstParty = "first-party-jwt"
)
diff --git a/pkg/kube/client.go b/pkg/kube/client.go
index 56f77b3c..7e5904f2 100644
--- a/pkg/kube/client.go
+++ b/pkg/kube/client.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/lazy"
"github.com/apache/dubbo-kubernetes/pkg/sleep"
"go.uber.org/atomic"
+ istioclient "istio.io/client-go/pkg/clientset/versioned"
kubeExtClient
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -61,6 +62,7 @@ type client struct {
clusterID cluster.ID
informerWatchesPending *atomic.Int32
started atomic.Bool
+ dubbo istioclient.Interface
crdWatcher kubetypes.CrdWatcher
fastSync bool
}
@@ -78,6 +80,8 @@ type Client interface {
Informers() informerfactory.InformerFactory
+ Dubbo() istioclient.Interface
+
ObjectFilter() kubetypes.DynamicObjectFilter
ClusterID() cluster.ID
@@ -143,6 +147,11 @@ func newClientInternal(clientFactory *clientFactory, opts
...ClientOption) (*cli
return nil, err
}
+ c.dubbo, err = istioclient.NewForConfig(c.config)
+ if err != nil {
+ return nil, err
+ }
+
c.extSet, err = kubeExtClient.NewForConfig(c.config)
if err != nil {
return nil, err
@@ -185,6 +194,19 @@ var (
_ CLIClient = &client{}
)
+func EnableCrdWatcher(c Client) Client {
+ if NewCrdWatcher == nil {
+ panic("NewCrdWatcher is unset. Likely the crd watcher library
is not imported anywhere")
+ }
+ if c.(*client).crdWatcher != nil {
+ panic("EnableCrdWatcher called twice for the same client")
+ }
+ c.(*client).crdWatcher = NewCrdWatcher(c)
+ return c
+}
+
+var NewCrdWatcher func(Client) kubetypes.CrdWatcher
+
func (c *client) Ext() kubeExtClient.Interface {
return c.extSet
}
@@ -209,6 +231,10 @@ func (c *client) Informers()
informerfactory.InformerFactory {
return c.informerFactory
}
+func (c *client) Dubbo() istioclient.Interface {
+ return c.dubbo
+}
+
func (c *client) ObjectFilter() kubetypes.DynamicObjectFilter {
return c.objectFilter
}
diff --git a/pkg/kube/controllers/queue.go b/pkg/kube/controllers/queue.go
index 929045fe..c3a2a178 100644
--- a/pkg/kube/controllers/queue.go
+++ b/pkg/kube/controllers/queue.go
@@ -45,6 +45,10 @@ func (q Queue) AddObject(obj Object) {
q.queue.Add(config.NamespacedName(obj))
}
+func (q Queue) HasSynced() bool {
+ return q.initialSync.Load()
+}
+
func WithRateLimiter(r workqueue.TypedRateLimiter[any]) func(q *Queue) {
return func(q *Queue) {
q.queue = workqueue.NewTypedRateLimitingQueue[any](r)
diff --git a/pkg/kube/kclient/client.go b/pkg/kube/kclient/client.go
index 2a340d73..2423cd8d 100644
--- a/pkg/kube/kclient/client.go
+++ b/pkg/kube/kclient/client.go
@@ -36,7 +36,9 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
+ "k8s.io/klog/v2"
"sync"
+ "sync/atomic"
)
type fullClient[T controllers.Object] struct {
@@ -52,6 +54,65 @@ func New[T controllers.ComparableObject](c kube.Client)
Client[T] {
return NewFiltered[T](c, Filter{})
}
+func NewUntypedInformer(c kube.Client, gvr schema.GroupVersionResource, filter
Filter) Untyped {
+ inf := kubeclient.GetInformerFilteredFromGVR(c, ToOpts(c, gvr, filter),
gvr)
+ return newInformerClient[controllers.Object](gvr, inf, filter)
+}
+
+func NewDelayedInformer[T controllers.ComparableObject](
+ c kube.Client,
+ gvr schema.GroupVersionResource,
+ informerType kubetypes.InformerType,
+ filter Filter,
+) Informer[T] {
+ watcher := c.CrdWatcher()
+ if watcher == nil {
+ klog.Info("NewDelayedInformer called without a CrdWatcher
enabled")
+ }
+ delay := newDelayedFilter(gvr, watcher)
+ inf := func() informerfactory.StartableInformer {
+ opts := ToOpts(c, gvr, filter)
+ opts.InformerType = informerType
+ return kubeclient.GetInformerFiltered[T](c, opts, gvr)
+ }
+ return newDelayedInformer[T](gvr, inf, delay, filter)
+}
+
+func newDelayedInformer[T controllers.ComparableObject](
+ gvr schema.GroupVersionResource,
+ getInf func() informerfactory.StartableInformer,
+ delay kubetypes.DelayedFilter,
+ filter Filter,
+) Informer[T] {
+ delayedClient := &delayedClient[T]{
+ inf: new(atomic.Pointer[Informer[T]]),
+ delayed: delay,
+ }
+
+ // If resource is not yet known, we will use the delayedClient.
+ // When the resource is later loaded, the callback will trigger and
swap our dummy delayedClient
+ // with a full client
+ readyNow := delay.KnownOrCallback(func(stop <-chan struct{}) {
+ // The inf() call is responsible for starting the informer
+ inf := getInf()
+ fc := &informerClient[T]{
+ informer: inf.Informer,
+ startInformer: inf.Start,
+ }
+ applyDynamicFilter(filter, gvr, fc)
+ inf.Start(stop)
+ klog.Infof("%v is now ready, building client",
gvr.GroupResource())
+ // Swap out the dummy client with the full one
+ delayedClient.set(fc)
+ })
+ if !readyNow {
+ klog.V(2).Infof("%v is not ready now, building delayed client",
gvr.GroupResource())
+ return delayedClient
+ }
+ klog.V(2).Infof("%v ready now, building client", gvr.GroupResource())
+ return newInformerClient[T](gvr, getInf(), filter)
+}
+
type Filter = kubetypes.Filter
func NewFiltered[T controllers.ComparableObject](c kube.Client, filter Filter)
Client[T] {
@@ -150,7 +211,7 @@ func (n *informerClient[T]) List(namespace string, selector
klabels.Selector) []
}
})
- if err != nil && features.EnableUnsafeAssertions {
+ if err != nil {
fmt.Printf("lister returned err for %v: %v", namespace, err)
}
return res
@@ -163,7 +224,7 @@ func (n *informerClient[T]) ListUnfiltered(namespace
string, selector klabels.Se
res = append(res, cast)
})
- if err != nil && features.EnableUnsafeAssertions {
+ if err != nil {
fmt.Printf("lister returned err for %v: %v", namespace, err)
}
return res
@@ -337,3 +398,10 @@ func (n *writeClient[T]) Delete(name, namespace string)
error {
api := kubeclient.GetWriteClient[T](n.client, namespace)
return api.Delete(context.Background(), name, metav1.DeleteOptions{})
}
+
+func NewMetadata(c kube.Client, gvr schema.GroupVersionResource, filter
Filter) Informer[*metav1.PartialObjectMetadata] {
+ opts := ToOpts(c, gvr, filter)
+ opts.InformerType = kubetypes.MetadataInformer
+ inf := kubeclient.GetInformerFilteredFromGVR(c, opts, gvr)
+ return newInformerClient[*metav1.PartialObjectMetadata](gvr, inf,
filter)
+}
diff --git a/pkg/kube/kclient/crdwatcher.go b/pkg/kube/kclient/crdwatcher.go
new file mode 100644
index 00000000..628f4985
--- /dev/null
+++ b/pkg/kube/kclient/crdwatcher.go
@@ -0,0 +1,120 @@
+package kclient
+
+import (
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvr"
+ "github.com/apache/dubbo-kubernetes/pkg/kube"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apimachinery/pkg/types"
+ "sync"
+)
+
+type crdWatcher struct {
+ crds Informer[*metav1.PartialObjectMetadata]
+ queue controllers.Queue
+ mutex sync.RWMutex
+ callbacks map[string][]func()
+
+ running chan struct{}
+ stop <-chan struct{}
+}
+
+func init() {
+ // Unfortunate hack needed to avoid circular imports
+ kube.NewCrdWatcher = newCrdWatcher
+}
+
+func newCrdWatcher(client kube.Client) kubetypes.CrdWatcher {
+ c := &crdWatcher{
+ running: make(chan struct{}),
+ callbacks: map[string][]func(){},
+ }
+
+ c.queue = controllers.NewQueue("crd watcher",
+ controllers.WithReconciler(c.Reconcile))
+ c.crds = NewMetadata(client, gvr.CustomResourceDefinition, Filter{
+ ObjectFilter:
kubetypes.NewStaticObjectFilter(minimumVersionFilter),
+ })
+ c.crds.AddEventHandler(controllers.ObjectHandler(c.queue.AddObject))
+ return c
+}
+
+func minimumVersionFilter(t any) bool {
+ return true
+}
+
+func (c *crdWatcher) Reconcile(key types.NamespacedName) error {
+ c.mutex.Lock()
+ callbacks, f := c.callbacks[key.Name]
+ if !f {
+ c.mutex.Unlock()
+ return nil
+ }
+ // Delete them so we do not run again
+ delete(c.callbacks, key.Name)
+ c.mutex.Unlock()
+ for _, cb := range callbacks {
+ cb()
+ }
+ return nil
+}
+
+func (c *crdWatcher) known(s schema.GroupVersionResource) bool {
+ // From the spec: "Its name MUST be in the format
<.spec.name>.<.spec.group>."
+ name := fmt.Sprintf("%s.%s", s.Resource, s.Group)
+ return c.crds.Get(name, "") != nil
+}
+
+func (c *crdWatcher) KnownOrCallback(s schema.GroupVersionResource, f
func(stop <-chan struct{})) bool {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+ // If we are already synced, return immediately if the CRD is present.
+ if c.crds.HasSynced() && c.known(s) {
+ // Already known, return early
+ return true
+ }
+ name := fmt.Sprintf("%s.%s", s.Resource, s.Group)
+ c.callbacks[name] = append(c.callbacks[name], func() {
+ // Call the callback
+ f(c.stop)
+ })
+ return false
+}
+
+func (c *crdWatcher) WaitForCRD(s schema.GroupVersionResource, stop <-chan
struct{}) bool {
+ done := make(chan struct{})
+ if c.KnownOrCallback(s, func(stop <-chan struct{}) {
+ close(done)
+ }) {
+ // Already known
+ return true
+ }
+ select {
+ case <-stop:
+ return false
+ case <-done:
+ return true
+ }
+}
+
+func (c *crdWatcher) HasSynced() bool {
+ return c.queue.HasSynced()
+}
+
+// Run starts the controller. This must be called.
+func (c *crdWatcher) Run(stop <-chan struct{}) {
+ c.mutex.Lock()
+ if c.stop != nil {
+ // Run already called. Because we call this from
client.RunAndWait this isn't uncommon
+ c.mutex.Unlock()
+ return
+ }
+ c.stop = stop
+ c.mutex.Unlock()
+ kube.WaitForCacheSync("crd watcher", stop, c.crds.HasSynced)
+ c.queue.Run(stop)
+ c.crds.ShutdownHandlers()
+}
diff --git a/pkg/kube/kclient/delayed.go b/pkg/kube/kclient/delayed.go
new file mode 100644
index 00000000..85a20832
--- /dev/null
+++ b/pkg/kube/kclient/delayed.go
@@ -0,0 +1,194 @@
+package kclient
+
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
+ "github.com/apache/dubbo-kubernetes/pkg/ptr"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
+ klabels "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/client-go/tools/cache"
+ "sync"
+ "sync/atomic"
+)
+
+type delayedClient[T controllers.ComparableObject] struct {
+ inf *atomic.Pointer[Informer[T]]
+
+ delayed kubetypes.DelayedFilter
+
+ hm sync.Mutex
+ handlers []delayedHandler
+ indexers []delayedIndex[T]
+ started <-chan struct{}
+}
+
+type delayedHandler struct {
+ cache.ResourceEventHandler
+ hasSynced delayedHandlerRegistration
+}
+
+type delayedHandlerRegistration struct {
+ hasSynced *atomic.Pointer[func() bool]
+}
+
+type delayedIndex[T any] struct {
+ name string
+ indexer *atomic.Pointer[RawIndexer]
+ extract func(o T) []string
+}
+
+type delayedFilter struct {
+ Watcher kubetypes.CrdWatcher
+ Resource schema.GroupVersionResource
+}
+
+func newDelayedFilter(resource schema.GroupVersionResource, watcher
kubetypes.CrdWatcher) *delayedFilter {
+ return &delayedFilter{
+ Watcher: watcher,
+ Resource: resource,
+ }
+}
+
+func (d *delayedFilter) HasSynced() bool {
+ return d.Watcher.HasSynced()
+}
+
+func (d *delayedFilter) KnownOrCallback(f func(stop <-chan struct{})) bool {
+ return d.Watcher.KnownOrCallback(d.Resource, f)
+}
+
+func (s *delayedClient[T]) set(inf Informer[T]) {
+ if inf != nil {
+ s.inf.Swap(&inf)
+ s.hm.Lock()
+ defer s.hm.Unlock()
+ for _, h := range s.handlers {
+ reg := inf.AddEventHandler(h)
+ h.hasSynced.hasSynced.Store(ptr.Of(reg.HasSynced))
+ }
+ s.handlers = nil
+ for _, i := range s.indexers {
+ res := inf.Index(i.name, i.extract)
+ i.indexer.Store(&res)
+ }
+ s.indexers = nil
+ if s.started != nil {
+ inf.Start(s.started)
+ }
+ }
+}
+
+func (s *delayedClient[T]) Get(name, namespace string) T {
+ if c := s.inf.Load(); c != nil {
+ return (*c).Get(name, namespace)
+ }
+ return ptr.Empty[T]()
+}
+
+func (s *delayedClient[T]) List(namespace string, selector klabels.Selector)
[]T {
+ if c := s.inf.Load(); c != nil {
+ return (*c).List(namespace, selector)
+ }
+ return nil
+}
+
+func (s *delayedClient[T]) ListUnfiltered(namespace string, selector
klabels.Selector) []T {
+ if c := s.inf.Load(); c != nil {
+ return (*c).ListUnfiltered(namespace, selector)
+ }
+ return nil
+}
+
+func (s *delayedClient[T]) ShutdownHandlers() {
+ if c := s.inf.Load(); c != nil {
+ (*c).ShutdownHandlers()
+ } else {
+ s.hm.Lock()
+ defer s.hm.Unlock()
+ s.handlers = nil
+ }
+}
+
+func (s *delayedClient[T]) ShutdownHandler(registration
cache.ResourceEventHandlerRegistration) {
+ if c := s.inf.Load(); c != nil {
+ (*c).ShutdownHandlers()
+ } else {
+ s.hm.Lock()
+ defer s.hm.Unlock()
+ s.handlers = slices.FilterInPlace(s.handlers, func(handler
delayedHandler) bool {
+ return handler.hasSynced != registration
+ })
+ }
+}
+
+func (s *delayedClient[T]) Start(stop <-chan struct{}) {
+ if c := s.inf.Load(); c != nil {
+ (*c).Start(stop)
+ }
+ s.hm.Lock()
+ defer s.hm.Unlock()
+ s.started = stop
+}
+
+func (s *delayedClient[T]) HasSynced() bool {
+ if c := s.inf.Load(); c != nil {
+ return (*c).HasSynced()
+ }
+ // If we haven't loaded the informer yet, we want to check if the
delayed filter is synced.
+ // This ensures that at startup, we only return HasSynced=true if we
are sure the CRD is not ready.
+ hs := s.delayed.HasSynced()
+ return hs
+}
+
+func (s *delayedClient[T]) HasSyncedIgnoringHandlers() bool {
+ if c := s.inf.Load(); c != nil {
+ return (*c).HasSyncedIgnoringHandlers()
+ }
+ // If we haven't loaded the informer yet, we want to check if the
delayed filter is synced.
+ // This ensures that at startup, we only return HasSynced=true if we
are sure the CRD is not ready.
+ hs := s.delayed.HasSynced()
+ return hs
+}
+
+func (r delayedHandlerRegistration) HasSynced() bool {
+ if s := r.hasSynced.Load(); s != nil {
+ return (*s)()
+ }
+ return false
+}
+
+func (s *delayedClient[T]) AddEventHandler(h cache.ResourceEventHandler)
cache.ResourceEventHandlerRegistration {
+ if c := s.inf.Load(); c != nil {
+ return (*c).AddEventHandler(h)
+ }
+ s.hm.Lock()
+ defer s.hm.Unlock()
+
+ hasSynced := delayedHandlerRegistration{hasSynced:
new(atomic.Pointer[func() bool])}
+ hasSynced.hasSynced.Store(ptr.Of(s.delayed.HasSynced))
+ s.handlers = append(s.handlers, delayedHandler{
+ ResourceEventHandler: h,
+ hasSynced: hasSynced,
+ })
+ return hasSynced
+}
+
+func (d delayedIndex[T]) Lookup(key string) []interface{} {
+ if c := d.indexer.Load(); c != nil {
+ return (*c).Lookup(key)
+ }
+ // Not ready yet, return nil
+ return nil
+}
+
+func (s *delayedClient[T]) Index(name string, extract func(o T) []string)
RawIndexer {
+ if c := s.inf.Load(); c != nil {
+ return (*c).Index(name, extract)
+ }
+ s.hm.Lock()
+ defer s.hm.Unlock()
+ di := delayedIndex[T]{name: name, indexer:
new(atomic.Pointer[RawIndexer]), extract: extract}
+ s.indexers = append(s.indexers, di)
+ return di
+}
diff --git a/pkg/kube/krt/map.go b/pkg/kube/krt/map.go
new file mode 100644
index 00000000..defd97f0
--- /dev/null
+++ b/pkg/kube/krt/map.go
@@ -0,0 +1,144 @@
+package krt
+
+import (
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/ptr"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
+)
+
+type mapCollection[T any, U any] struct {
+ collectionName string
+ id collectionUID
+ collection internalCollection[T]
+ mapFunc func(T) U
+ metadata Metadata
+}
+
+type mappedIndexer[T any, U any] struct {
+ indexer indexer[T]
+ mapFunc func(T) U
+}
+
+func MapCollection[T, U any](
+ collection Collection[T],
+ mapFunc func(T) U,
+ opts ...CollectionOption,
+) Collection[U] {
+ o := buildCollectionOptions(opts...)
+ if o.name == "" {
+ o.name = fmt.Sprintf("Map[%v]", ptr.TypeName[T]())
+ }
+ ic := collection.(internalCollection[T])
+ metadata := o.metadata
+ if metadata == nil {
+ metadata = ic.Metadata()
+ }
+ m := &mapCollection[T, U]{
+ collectionName: o.name,
+ id: nextUID(),
+ collection: ic,
+ mapFunc: mapFunc,
+ metadata: metadata,
+ }
+ maybeRegisterCollectionForDebugging[U](m, o.debugger)
+ return m
+}
+
+func (m *mapCollection[T, U]) GetKey(k string) *U {
+ if obj := m.collection.GetKey(k); obj != nil {
+ return ptr.Of(m.mapFunc(*obj))
+ }
+ return nil
+}
+
+func (m *mapCollection[T, U]) List() []U {
+ vals := m.collection.List()
+ res := make([]U, 0, len(vals))
+ for _, obj := range vals {
+ res = append(res, m.mapFunc(obj))
+ }
+ if EnableAssertions {
+ for _, obj := range vals {
+ ok := GetKey(obj)
+ nk := GetKey(m.mapFunc(obj))
+ if nk != ok {
+ panic(fmt.Sprintf("Input and output key must be
the same for MapCollection %q %q", ok, nk))
+ }
+ }
+ }
+ return res
+}
+
+func (m *mapCollection[T, U]) HasSynced() bool {
+ return m.collection.HasSynced()
+}
+
+func (m *mapCollection[T, U]) WaitUntilSynced(stop <-chan struct{}) bool {
+ return m.collection.WaitUntilSynced(stop)
+}
+
+func (m *mapCollection[T, U]) name() string { return m.collectionName }
+
+// nolint: unused // (not true, its to implement an interface)
+func (m *mapCollection[T, U]) uid() collectionUID { return m.id }
+
+// nolint: unused // (not true, its to implement an interface)
+func (m *mapCollection[T, U]) dump() CollectionDump {
+ return CollectionDump{
+ Outputs: eraseMap(slices.GroupUnique(m.List(),
getTypedKey)),
+ Synced: m.HasSynced(),
+ InputCollection: m.collection.name(),
+ }
+}
+
+func (m *mapCollection[T, U]) augment(a any) any {
+ // not supported in this collection type
+ return a
+}
+
+func (m *mapCollection[T, U]) index(name string, extract func(o U) []string)
indexer[U] {
+ t := func(o T) []string {
+ return extract(m.mapFunc(o))
+ }
+ idxs := m.collection.index(name, t)
+ return &mappedIndexer[T, U]{
+ indexer: idxs,
+ mapFunc: m.mapFunc,
+ }
+}
+
+func (m *mappedIndexer[T, U]) Lookup(k string) []U {
+ keys := m.indexer.Lookup(k)
+ res := make([]U, 0, len(keys))
+ for _, obj := range keys {
+ res = append(res, m.mapFunc(obj))
+ }
+ return res
+}
+
+func (m *mapCollection[T, U]) Register(handler func(Event[U]))
HandlerRegistration {
+ return registerHandlerAsBatched(m, handler)
+}
+
+func (m *mapCollection[T, U]) RegisterBatch(handler func([]Event[U]),
runExistingState bool) HandlerRegistration {
+ return m.collection.RegisterBatch(func(t []Event[T]) {
+ events := make([]Event[U], 0, len(t))
+ for _, o := range t {
+ e := Event[U]{
+ Event: o.Event,
+ }
+ if o.Old != nil {
+ e.Old = ptr.Of(m.mapFunc(*o.Old))
+ }
+ if o.New != nil {
+ e.New = ptr.Of(m.mapFunc(*o.New))
+ }
+ events = append(events, e)
+ }
+ handler(events)
+ }, runExistingState)
+}
+
+func (m *mapCollection[T, U]) Metadata() Metadata {
+ return m.metadata
+}
diff --git a/pkg/kube/kubetypes/types.go b/pkg/kube/kubetypes/types.go
index 5b6aec02..a6140d3f 100644
--- a/pkg/kube/kubetypes/types.go
+++ b/pkg/kube/kubetypes/types.go
@@ -20,6 +20,7 @@ package kubetypes
import (
"context"
"github.com/apache/dubbo-kubernetes/pkg/cluster"
+ "github.com/apache/dubbo-kubernetes/pkg/slices"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -90,3 +91,44 @@ type CrdWatcher interface {
WaitForCRD(s schema.GroupVersionResource, stop <-chan struct{}) bool
Run(stop <-chan struct{})
}
+
+type composedFilter struct {
+ filter DynamicObjectFilter
+ extra []func(obj any) bool
+}
+
+func (f composedFilter) Filter(obj any) bool {
+ for _, filter := range f.extra {
+ if !filter(obj) {
+ return false
+ }
+ }
+ if f.filter != nil {
+ return f.filter.Filter(obj)
+ }
+ return true
+}
+
+func (f composedFilter) AddHandler(fn func(selected, deselected sets.String)) {
+ if f.filter != nil {
+ f.filter.AddHandler(fn)
+ }
+}
+
+func ComposeFilters(filter DynamicObjectFilter, extra ...func(obj any) bool)
DynamicObjectFilter {
+ return composedFilter{
+ filter: filter,
+ extra: slices.FilterInPlace(extra, func(f func(obj any) bool)
bool {
+ return f != nil
+ }),
+ }
+}
+
+func NewStaticObjectFilter(f func(obj any) bool) DynamicObjectFilter {
+ return staticFilter{f}
+}
+
+type DelayedFilter interface {
+ HasSynced() bool
+ KnownOrCallback(f func(stop <-chan struct{})) bool
+}
diff --git a/pkg/model/proxy.go b/pkg/model/proxy.go
deleted file mode 100644
index fd275798..00000000
--- a/pkg/model/proxy.go
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-// NodeType decides the responsibility of the proxy serves in the mesh
-type NodeType string
-
-const (
- // SidecarProxy type is used for sidecar proxies in the application
containers
- SidecarProxy NodeType = "sidecar"
-
- // Router type is used for standalone proxies acting as L7/L4 routers
- Router NodeType = "router"
-)
-
-// IsApplicationNodeType verifies that the NodeType is one of the declared
constants in the model
-func IsApplicationNodeType(nType NodeType) bool {
- switch nType {
- case SidecarProxy, Router:
- return true
- default:
- return false
- }
-}
diff --git a/pkg/security/security.go b/pkg/security/security.go
index 4476e0be..5f26199e 100644
--- a/pkg/security/security.go
+++ b/pkg/security/security.go
@@ -105,24 +105,23 @@ type Caller struct {
}
type Options struct {
- ServeOnlyFiles bool
- ProvCert string
- FileMountedCerts bool
- SailCertProvider string
- OutputKeyCertToDir string
- CertChainFilePath string
- KeyFilePath string
- RootCertFilePath string
- CARootPath string
- CAEndpoint string
- CAProviderName string
- CredFetcher CredFetcher
- CAHeaders map[string]string
- CAEndpointSAN string
- CertSigner string
- ClusterID string
- CredIdentityProvider string
- TrustDomain string
+ ServeOnlyFiles bool
+ ProvCert string
+ FileMountedCerts bool
+ SailCertProvider string
+ OutputKeyCertToDir string
+ CertChainFilePath string
+ KeyFilePath string
+ RootCertFilePath string
+ CARootPath string
+ CAEndpoint string
+ CAProviderName string
+ CredFetcher CredFetcher
+ CAHeaders map[string]string
+ CAEndpointSAN string
+ CertSigner string
+ ClusterID string
+ TrustDomain string
}
type CredFetcher interface {
diff --git a/pkg/webhooks/server/server.go b/pkg/webhooks/server/server.go
new file mode 100644
index 00000000..9612ec72
--- /dev/null
+++ b/pkg/webhooks/server/server.go
@@ -0,0 +1,237 @@
+package server
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config/constants"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/resource"
+ "github.com/apache/dubbo-kubernetes/pkg/config/validation"
+ "github.com/apache/dubbo-kubernetes/pkg/kube"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/config/kube/crd"
+ "github.com/hashicorp/go-multierror"
+ admissionv1 "k8s.io/api/admission/v1"
+ kubeApiAdmissionv1beta1 "k8s.io/api/admission/v1beta1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/serializer"
+ "k8s.io/klog/v2"
+ "net/http"
+)
+
+var (
+ runtimeScheme = runtime.NewScheme()
+ codecs = serializer.NewCodecFactory(runtimeScheme)
+ deserializer = codecs.UniversalDeserializer()
+
+ // Expect AdmissionRequest to only include these top-level field names
+ validFields = map[string]bool{
+ "apiVersion": true,
+ "kind": true,
+ "metadata": true,
+ "spec": true,
+ "status": true,
+ }
+)
+
+func init() {
+ _ = admissionv1.AddToScheme(runtimeScheme)
+ _ = kubeApiAdmissionv1beta1.AddToScheme(runtimeScheme)
+}
+
+type Options struct {
+ Schemas collection.Schemas
+ DomainSuffix string
+ Port uint
+ Mux *http.ServeMux
+}
+
+// String produces a stringified version of the arguments for debugging.
+func (o Options) String() string {
+ buf := &bytes.Buffer{}
+
+ _, _ = fmt.Fprintf(buf, "DomainSuffix: %s\n", o.DomainSuffix)
+ _, _ = fmt.Fprintf(buf, "Port: %d\n", o.Port)
+
+ return buf.String()
+}
+
+type Webhook struct {
+ schemas collection.Schemas
+ domainSuffix string
+}
+
+// New creates a new instance of the admission webhook server.
+func New(o Options) (*Webhook, error) {
+ if o.Mux == nil {
+ return nil, errors.New("expected mux to be passed, but was not
passed")
+ }
+ wh := &Webhook{
+ schemas: o.Schemas,
+ domainSuffix: o.DomainSuffix,
+ }
+
+ o.Mux.HandleFunc("/validate", wh.serveValidate)
+ o.Mux.HandleFunc("/validate/", wh.serveValidate)
+
+ return wh, nil
+}
+
+func (wh *Webhook) serveValidate(w http.ResponseWriter, r *http.Request) {
+ serve(w, r, wh.validate)
+}
+
+func toAdmissionResponse(err error) *kube.AdmissionResponse {
+ return &kube.AdmissionResponse{Result: &metav1.Status{Message:
err.Error()}}
+}
+
+func (wh *Webhook) validate(request *kube.AdmissionRequest)
*kube.AdmissionResponse {
+ isDryRun := request.DryRun != nil && *request.DryRun
+ addDryRunMessageIfNeeded := func(errStr string) error {
+ err := fmt.Errorf("%s", errStr)
+ if isDryRun {
+ err = fmt.Errorf("%s (dry run)", err)
+ }
+ return err
+ }
+ switch request.Operation {
+ case kube.Create, kube.Update:
+ default:
+ klog.Warningf("Unsupported webhook operation %v",
addDryRunMessageIfNeeded(request.Operation))
+ return &kube.AdmissionResponse{Allowed: true}
+ }
+
+ var obj crd.DubboKind
+ if err := json.Unmarshal(request.Object.Raw, &obj); err != nil {
+ klog.Infof("cannot decode configuration: %v",
addDryRunMessageIfNeeded(err.Error()))
+ return toAdmissionResponse(fmt.Errorf("cannot decode
configuration: %v", err))
+ }
+
+ gvk := obj.GroupVersionKind()
+
+ // "Version" is not relevant for Istio types; each version has the same
schema. So do a lookup that does not consider
+ // version. This ensures if a new version comes out and Istiod is not
updated, we won't reject it.
+ s, exists :=
wh.schemas.FindByGroupKind(resource.FromKubernetesGVK(&gvk))
+ if !exists {
+ klog.Infof("unrecognized type %v",
addDryRunMessageIfNeeded(obj.GroupVersionKind().String()))
+ return toAdmissionResponse(fmt.Errorf("unrecognized type %v",
obj.GroupVersionKind()))
+ }
+
+ out, err := crd.ConvertObject(s, &obj, wh.domainSuffix)
+ if err != nil {
+ klog.Infof("error decoding configuration: %v",
addDryRunMessageIfNeeded(err.Error()))
+ return toAdmissionResponse(fmt.Errorf("error decoding
configuration: %v", err))
+ }
+
+ warnings, err := s.ValidateConfig(*out)
+ if err != nil {
+ if _, f := out.Annotations[constants.AlwaysReject]; !f {
+ // Hide error message if it was intentionally rejected
(by our own internal call)
+ klog.Infof("configuration is invalid: %v",
addDryRunMessageIfNeeded(err.Error()))
+ }
+ return toAdmissionResponse(fmt.Errorf("configuration is
invalid: %v", err))
+ }
+
+ if _, err := checkFields(request.Object.Raw, request.Kind.Kind,
request.Namespace, obj.Name); err != nil {
+ return toAdmissionResponse(err)
+ }
+ return &kube.AdmissionResponse{Allowed: true, Warnings:
toKubeWarnings(warnings)}
+}
+
+func toKubeWarnings(warn validation.Warning) []string {
+ if warn == nil {
+ return nil
+ }
+ me, ok := warn.(*multierror.Error)
+ if ok {
+ res := []string{}
+ for _, e := range me.Errors {
+ res = append(res, e.Error())
+ }
+ return res
+ }
+ return []string{warn.Error()}
+}
+
+type admitFunc func(*kube.AdmissionRequest) *kube.AdmissionResponse
+
+func serve(w http.ResponseWriter, r *http.Request, admit admitFunc) {
+ var body []byte
+ if r.Body != nil {
+ if data, err := kube.HTTPConfigReader(r); err == nil {
+ body = data
+ } else {
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+ }
+ if len(body) == 0 {
+ http.Error(w, "no body found", http.StatusBadRequest)
+ return
+ }
+
+ // verify the content type is accurate
+ contentType := r.Header.Get("Content-Type")
+ if contentType != "application/json" {
+ http.Error(w, "invalid Content-Type, want `application/json`",
http.StatusUnsupportedMediaType)
+ return
+ }
+
+ var reviewResponse *kube.AdmissionResponse
+ var obj runtime.Object
+ var ar *kube.AdmissionReview
+ if out, _, err := deserializer.Decode(body, nil, obj); err != nil {
+ reviewResponse = toAdmissionResponse(fmt.Errorf("could not
decode body: %v", err))
+ } else {
+ ar, err = kube.AdmissionReviewKubeToAdapter(out)
+ if err != nil {
+ reviewResponse = toAdmissionResponse(fmt.Errorf("could
not decode object: %v", err))
+ } else {
+ reviewResponse = admit(ar.Request)
+ }
+ }
+
+ response := kube.AdmissionReview{}
+ response.Response = reviewResponse
+ var responseKube runtime.Object
+ var apiVersion string
+ if ar != nil {
+ apiVersion = ar.APIVersion
+ response.TypeMeta = ar.TypeMeta
+ if response.Response != nil {
+ if ar.Request != nil {
+ response.Response.UID = ar.Request.UID
+ }
+ }
+ }
+ responseKube = kube.AdmissionReviewAdapterToKube(&response, apiVersion)
+ resp, err := json.Marshal(responseKube)
+ if err != nil {
+ http.Error(w, fmt.Sprintf("could encode response: %v", err),
http.StatusInternalServerError)
+ return
+ }
+ if _, err := w.Write(resp); err != nil {
+ http.Error(w, fmt.Sprintf("could write response: %v", err),
http.StatusInternalServerError)
+ }
+}
+
+func checkFields(raw []byte, kind string, namespace string, name string)
(string, error) {
+ trial := make(map[string]json.RawMessage)
+ if err := json.Unmarshal(raw, &trial); err != nil {
+ klog.Errorf("cannot decode configuration fields: %v", err)
+ return "yaml_decode_error", fmt.Errorf("cannot decode
configuration fields: %v", err)
+ }
+
+ for key := range trial {
+ if _, ok := validFields[key]; !ok {
+ klog.Infof("unknown field %q on %s resource %s/%s",
+ key, kind, namespace, name)
+ return "invalid_resource", fmt.Errorf("unknown field %q
on %s resource %s/%s",
+ key, kind, namespace, name)
+ }
+ }
+
+ return "", nil
+}
diff --git a/pkg/webhooks/validation/controller/controller.go
b/pkg/webhooks/validation/controller/controller.go
new file mode 100644
index 00000000..01208422
--- /dev/null
+++ b/pkg/webhooks/validation/controller/controller.go
@@ -0,0 +1,189 @@
+package controller
+
+import (
+ "bytes"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/kube"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
+ "github.com/apache/dubbo-kubernetes/pkg/ptr"
+ "github.com/apache/dubbo-kubernetes/pkg/webhooks/util"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/keycertbundle"
+ "istio.io/api/label"
+ kubeApiAdmission "k8s.io/api/admissionregistration/v1"
+ klabels "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/client-go/util/workqueue"
+ "k8s.io/klog/v2"
+ "math"
+ "time"
+)
+
+type Options struct {
+ // Istio system namespace where istiod resides.
+ WatchedNamespace string
+
+ // File path to the x509 certificate bundle used by the webhook server
+ // and patched into the webhook config.
+ CABundleWatcher *keycertbundle.Watcher
+
+ // Revision for control plane performing patching on the validating
webhook.
+ Revision string
+
+ // Name of the service running the webhook server.
+ ServiceName string
+}
+
+type Controller struct {
+ o Options
+ client kube.Client
+
+ queue controllers.Queue
+ dryRunOfInvalidConfigRejected bool
+ webhooks
kclient.Client[*kubeApiAdmission.ValidatingWebhookConfiguration]
+}
+
+func NewValidatingWebhookController(client kube.Client, ns string,
caBundleWatcher *keycertbundle.Watcher) *Controller {
+ o := Options{
+ WatchedNamespace: ns,
+ CABundleWatcher: caBundleWatcher,
+ ServiceName: "dubbod",
+ }
+ return newController(o, client)
+}
+
+func caBundleUpdateRequired(current
*kubeApiAdmission.ValidatingWebhookConfiguration, caBundle []byte) bool {
+ for _, wh := range current.Webhooks {
+ if !bytes.Equal(wh.ClientConfig.CABundle, caBundle) {
+ return true
+ }
+ }
+ return false
+}
+
+func failurePolicyIsIgnore(current
*kubeApiAdmission.ValidatingWebhookConfiguration) bool {
+ for _, wh := range current.Webhooks {
+ if wh.FailurePolicy != nil && *wh.FailurePolicy !=
kubeApiAdmission.Fail {
+ return true
+ }
+ }
+ return false
+}
+
+func (c *Controller) readyForFailClose() bool {
+ if !c.dryRunOfInvalidConfigRejected {
+ klog.Info("Endpoint successfully rejected invalid config.
Switching to fail-close.")
+ c.dryRunOfInvalidConfigRejected = true
+ // Sync all webhooks; this ensures if we have multiple webhooks
all of them are updated
+ c.syncAll()
+ }
+ return true
+}
+
+func (c *Controller) updateValidatingWebhookConfiguration(current
*kubeApiAdmission.ValidatingWebhookConfiguration, caBundle []byte) error {
+ caChangeNeeded := caBundleUpdateRequired(current, caBundle)
+ failurePolicyMaybeNeedsUpdate := failurePolicyIsIgnore(current)
+ if !caChangeNeeded && !failurePolicyMaybeNeedsUpdate {
+ klog.V(2).Info("up-to-date, no change required")
+ return nil
+ }
+ updateFailurePolicy := true
+ // Only check readyForFailClose if we need to switch, to avoid
redundant calls
+ if failurePolicyMaybeNeedsUpdate && !c.readyForFailClose() {
+ klog.V(2).Info("failurePolicy is Ignore, but webhook is not
ready; not setting to Fail")
+ updateFailurePolicy = false
+ }
+ updated := current.DeepCopy()
+ for i := range updated.Webhooks {
+ updated.Webhooks[i].ClientConfig.CABundle = caBundle
+ if updateFailurePolicy {
+ updated.Webhooks[i].FailurePolicy =
ptr.Of(kubeApiAdmission.Fail)
+ }
+ }
+
+ _, err := c.webhooks.Update(updated)
+ if err != nil {
+ klog.Errorf("failed to updated: %v", err)
+ return fmt.Errorf("fail to update webhook: %v", err)
+ }
+
+ if !updateFailurePolicy {
+ return fmt.Errorf("webhook is not ready, retry")
+ }
+ return nil
+}
+
+func (c *Controller) Reconcile(key types.NamespacedName) error {
+ name := key.Name
+ whc := c.webhooks.Get(name, "")
+ // Stop early if webhook is not present, rather than attempting (and
failing) to reconcile permanently
+ // If the webhook is later added a new reconciliation request will
trigger it to update
+ if whc == nil {
+ klog.Info("Skip patching webhook, not found")
+ return nil
+ }
+
+ klog.V(2).Info("Reconcile(enter)")
+ defer func() { klog.V(2).Info("Reconcile(exit)") }()
+
+ caBundle, err := util.LoadCABundle(c.o.CABundleWatcher)
+ if err != nil {
+ klog.Errorf("Failed to load CA bundle: %v", err)
+ return nil
+ }
+ return c.updateValidatingWebhookConfiguration(whc, caBundle)
+}
+
+func newController(o Options, client kube.Client) *Controller {
+ c := &Controller{
+ o: o,
+ client: client,
+ }
+
+ c.queue = controllers.NewQueue("validation",
+ controllers.WithReconciler(c.Reconcile),
+ // Webhook patching has to be retried forever. But the retries
would be rate limited.
+ controllers.WithMaxAttempts(math.MaxInt),
+ // Retry with backoff. Failures could be from conflicts of
other instances (quick retry helps), or
+ // longer lasting concerns which will eventually be retried on
1min interval.
+ // Unlike the mutating webhook controller, we do not use
NewItemFastSlowRateLimiter. This is because
+ // the validation controller waits for its own service to be
ready, so typically this takes a few seconds
+ // before we are ready; using FastSlow means we tend to always
take the Slow time (1min).
+
controllers.WithRateLimiter(workqueue.NewTypedItemExponentialFailureRateLimiter[any](100*time.Millisecond,
1*time.Minute)))
+
+ c.webhooks =
kclient.NewFiltered[*kubeApiAdmission.ValidatingWebhookConfiguration](client,
kclient.Filter{
+ LabelSelector: fmt.Sprintf("%s=%s", label.IoIstioRev.Name,
o.Revision),
+ })
+ c.webhooks.AddEventHandler(controllers.ObjectHandler(c.queue.AddObject))
+
+ return c
+}
+
+func (c *Controller) Run(stop <-chan struct{}) {
+ kube.WaitForCacheSync("validation", stop, c.webhooks.HasSynced)
+ go c.startCaBundleWatcher(stop)
+ c.queue.Run(stop)
+}
+
+func (c *Controller) startCaBundleWatcher(stop <-chan struct{}) {
+ if c.o.CABundleWatcher == nil {
+ return
+ }
+ id, watchCh := c.o.CABundleWatcher.AddWatcher()
+ defer c.o.CABundleWatcher.RemoveWatcher(id)
+
+ for {
+ select {
+ case <-watchCh:
+ c.syncAll()
+ case <-stop:
+ return
+ }
+ }
+}
+
+func (c *Controller) syncAll() {
+ for _, whc := range c.webhooks.List("", klabels.Everything()) {
+ c.queue.AddObject(whc)
+ }
+}
diff --git a/pkg/xds/server.go b/pkg/xds/server.go
index 3a213c9d..f0aa72ab 100644
--- a/pkg/xds/server.go
+++ b/pkg/xds/server.go
@@ -20,7 +20,6 @@ package xds
import (
"github.com/apache/dubbo-kubernetes/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/util/sets"
- "github.com/apache/dubbo-kubernetes/sail/pkg/features"
dubbogrpc "github.com/apache/dubbo-kubernetes/sail/pkg/grpc"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery
"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
@@ -235,7 +234,7 @@ func ShouldRespond(w Watcher, id string, request
*discovery.DiscoveryRequest) (b
// A nonce becomes stale following a newer nonce being sent to Envoy.
// previousInfo.NonceSent can be empty if we previously had
shouldRespond=true but didn't send any resources.
if request.ResponseNonce != previousInfo.NonceSent {
- if features.EnableUnsafeAssertions && previousInfo.NonceSent ==
"" {
+ if previousInfo.NonceSent == "" {
// Assert we do not end up in an invalid state
klog.V(2).Infof("ADS:%s: REQ %s Expired nonce received
%s, but we never sent any nonce", stype,
id, request.ResponseNonce)
diff --git a/sail/cmd/sail-agent/app/cmd.go b/sail/cmd/sail-agent/app/cmd.go
index 27d97f53..850d9686 100644
--- a/sail/cmd/sail-agent/app/cmd.go
+++ b/sail/cmd/sail-agent/app/cmd.go
@@ -22,10 +22,8 @@ import (
"errors"
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/cmd"
- "github.com/apache/dubbo-kubernetes/pkg/config/constants"
dubboagent "github.com/apache/dubbo-kubernetes/pkg/dubbo-agent"
"github.com/apache/dubbo-kubernetes/pkg/dubbo-agent/config"
- "github.com/apache/dubbo-kubernetes/pkg/model"
"github.com/apache/dubbo-kubernetes/pkg/util/protomarshal"
"github.com/apache/dubbo-kubernetes/sail/cmd/sail-agent/options"
"github.com/spf13/cobra"
@@ -40,7 +38,7 @@ func NewRootCommand(sds dubboagent.SDSServiceFactory)
*cobra.Command {
rootCmd := &cobra.Command{
Use: "sail-agent",
Short: "Dubbo Sail agent.",
- Long: "Dubbo Sail agent runs in the sidecar or gateway
container and bootstraps Envoy.",
+ Long: "Dubbo Sail agent bootstraps via gRPC xDS.",
SilenceUsage: true,
FParseErrWhitelist: cobra.FParseErrWhitelist{
// Allow unknown flags for backward-compatibility.
@@ -66,12 +64,8 @@ func newProxyCommand(sds dubboagent.SDSServiceFactory)
*cobra.Command {
},
RunE: func(c *cobra.Command, args []string) error {
cmd.PrintFlags(c.Flags())
- err := initProxy(args)
- if err != nil {
- return err
- }
- proxyConfig, err :=
config.ConstructProxyConfig(proxyArgs.MeshConfigFile, proxyArgs.ServiceCluster,
options.ProxyConfigEnv, proxyArgs.Concurrency)
+ proxyConfig, err :=
config.ConstructProxyConfig(proxyArgs.MeshConfigFile, options.ProxyConfigEnv)
if err != nil {
return fmt.Errorf("failed to get proxy config:
%v", err)
}
@@ -106,17 +100,6 @@ func newProxyCommand(sds dubboagent.SDSServiceFactory)
*cobra.Command {
}
}
-func initProxy(args []string) error {
- proxyArgs.Type = model.SidecarProxy
- if len(args) > 0 {
- proxyArgs.Type = model.NodeType(args[0])
- if !model.IsApplicationNodeType(proxyArgs.Type) {
- return fmt.Errorf("invalid proxy Type: %s",
string(proxyArgs.Type))
- }
- }
- return nil
-}
-
func addFlags(proxyCmd *cobra.Command) {
proxyArgs = options.NewProxyArgs()
proxyCmd.PersistentFlags().StringVar(&proxyArgs.DNSDomain, "domain", "",
@@ -124,5 +107,4 @@ func addFlags(proxyCmd *cobra.Command) {
proxyCmd.PersistentFlags().StringVar(&proxyArgs.MeshConfigFile,
"meshConfig", "./etc/dubbo/config/mesh",
"File name for Dubbo mesh configuration. If not specified, a
default mesh will be used. This may be overridden by "+
"PROXY_CONFIG environment variable or
proxy.dubbo.io/config annotation.")
- proxyCmd.PersistentFlags().StringVar(&proxyArgs.ServiceCluster,
"serviceCluster", constants.ServiceClusterName, "Service cluster")
}
diff --git a/sail/cmd/sail-agent/app/wait.go b/sail/cmd/sail-agent/app/wait.go
index c43b1157..4fb7e2eb 100644
--- a/sail/cmd/sail-agent/app/wait.go
+++ b/sail/cmd/sail-agent/app/wait.go
@@ -21,39 +21,36 @@ import (
"fmt"
"github.com/spf13/cobra"
"io"
+ "k8s.io/klog/v2"
"net/http"
"time"
)
var (
- timeoutSeconds int
requestTimeoutMillis int
periodMillis int
url string
waitCmd = &cobra.Command{
Use: "wait",
- Short: "Waits until the Envoy proxy is ready",
+ Short: "Waits until the Proxy XDS is ready",
RunE: func(c *cobra.Command, args []string) error {
client := &http.Client{
Timeout: time.Duration(requestTimeoutMillis) *
time.Millisecond,
}
- fmt.Printf("Waiting for Envoy proxy to be ready
(timeout: %d seconds)...", timeoutSeconds)
+ klog.Infof("Waiting for Proxy XDS to be ready")
var err error
- timeout := time.After(time.Duration(timeoutSeconds) *
time.Second)
for {
select {
- case <-timeout:
- return fmt.Errorf("timeout waiting for
Envoy proxy to become ready. Last error: %v", err)
case <-time.After(time.Duration(periodMillis) *
time.Millisecond):
err = checkIfReady(client, url)
if err == nil {
- fmt.Println("Envoy is ready!")
+ klog.Info("Proxy XDS is ready")
return nil
}
- fmt.Printf("Not ready yet: %v", err)
+ klog.Errorf("Not ready yet: %v\n", err)
}
}
},
@@ -79,3 +76,9 @@ func checkIfReady(client *http.Client, url string) error {
}
return nil
}
+
+func init() {
+ waitCmd.PersistentFlags().IntVar(&requestTimeoutMillis,
"requestTimeoutMillis", 500, "number of milliseconds to wait for response")
+ waitCmd.PersistentFlags().IntVar(&periodMillis, "periodMillis", 500,
"number of milliseconds to wait between attempts")
+ waitCmd.PersistentFlags().StringVar(&url, "url",
"http://localhost:15020/healthz/ready", "URL to use in requests")
+}
diff --git a/sail/cmd/sail-agent/options/agent.go
b/sail/cmd/sail-agent/options/agent.go
index ae487025..0d602d36 100644
--- a/sail/cmd/sail-agent/options/agent.go
+++ b/sail/cmd/sail-agent/options/agent.go
@@ -11,6 +11,9 @@ const xdsHeaderPrefix = "XDS_HEADER_"
func NewAgentOptions(proxy *ProxyArgs, cfg *meshconfig.ProxyConfig, sds
dubboagent.SDSServiceFactory) *dubboagent.AgentOptions {
o := &dubboagent.AgentOptions{
+ XDSHeaders: map[string]string{},
+ GRPCBootstrapPath: grpcBootstrapEnv,
+ SDSFactory: sds,
WorkloadIdentitySocketFile: workloadIdentitySocketFile,
}
extractXDSHeadersFromEnv(o)
diff --git a/sail/cmd/sail-agent/options/agent_proxy.go
b/sail/cmd/sail-agent/options/agent_proxy.go
index 0b6e9f36..ad507828 100644
--- a/sail/cmd/sail-agent/options/agent_proxy.go
+++ b/sail/cmd/sail-agent/options/agent_proxy.go
@@ -26,7 +26,6 @@ type ProxyArgs struct {
StsPort int
TokenManagerPlugin string
MeshConfigFile string
- ServiceCluster string
}
func NewProxyArgs() ProxyArgs {
diff --git a/sail/cmd/sail-agent/options/options.go
b/sail/cmd/sail-agent/options/options.go
index 6f4bdd3a..6088635a 100644
--- a/sail/cmd/sail-agent/options/options.go
+++ b/sail/cmd/sail-agent/options/options.go
@@ -2,28 +2,23 @@ package options
import (
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config/constants"
"github.com/apache/dubbo-kubernetes/pkg/env"
"github.com/apache/dubbo-kubernetes/pkg/jwt"
"github.com/apache/dubbo-kubernetes/pkg/security"
+ "path/filepath"
)
var (
- ProxyConfigEnv = env.Register(
- "PROXY_CONFIG",
- "",
- "The proxy configuration. This will be set by the injection -
gateways will use file mounts.",
- ).Get()
dubbodSAN = env.Register("DUBBOD_SAN", "",
- "Override the ServerName used to validate Istiod certificate. "+
+ "Override the ServerName used to validate Dubbod certificate. "+
"Can be used as an alternative to setting /etc/hosts
for VMs - discovery address will be an IP:port")
jwtPolicy = env.Register("JWT_POLICY", jwt.PolicyThirdParty,
"The JWT validation policy.")
workloadIdentitySocketFile =
env.Register("WORKLOAD_IDENTITY_SOCKET_FILE",
security.DefaultWorkloadIdentitySocketFile,
fmt.Sprintf("SPIRE workload identity SDS socket filename. If
set, an SDS socket with this name must exist at %s",
security.WorkloadIdentityPath)).Get()
- credFetcherTypeEnv = env.Register("CREDENTIAL_FETCHER_TYPE",
security.JWT,
- "The type of the credential fetcher. Currently supported types
include GoogleComputeEngine").Get()
- credIdentityProvider = env.Register("CREDENTIAL_IDENTITY_PROVIDER",
"GoogleComputeEngine",
- "The identity provider for credential. Currently default
supported identity provider is GoogleComputeEngine").Get()
+ grpcBootstrapEnv = env.Register("GRPC_XDS_BOOTSTRAP",
filepath.Join(constants.ConfigPathDir, "grpc-bootstrap.json"),
+ "Path where gRPC expects to read a bootstrap file. Agent will
generate one if set.").Get()
caProviderEnv = env.Register("CA_PROVIDER", "Aegis", "name of
authentication provider").Get()
caEndpointEnv = env.Register("CA_ADDR", "", "Address of the spiffe
certificate provider. Defaults to discoveryAddress").Get()
)
diff --git a/sail/cmd/sail-agent/options/security.go
b/sail/cmd/sail-agent/options/security.go
index 2e1322fd..d8c8b4f5 100644
--- a/sail/cmd/sail-agent/options/security.go
+++ b/sail/cmd/sail-agent/options/security.go
@@ -2,10 +2,8 @@ package options
import (
"fmt"
- "github.com/apache/dubbo-kubernetes/pkg/config/constants"
"github.com/apache/dubbo-kubernetes/pkg/jwt"
"github.com/apache/dubbo-kubernetes/pkg/security"
- "github.com/apache/dubbo-kubernetes/security/pkg/credentialfetcher"
meshconfig "istio.io/api/mesh/v1alpha1"
"k8s.io/klog/v2"
"os"
@@ -20,8 +18,7 @@ func NewSecurityOptions(proxyConfig *meshconfig.ProxyConfig,
stsPort int, tokenM
CAProviderName: caProviderEnv,
}
- o, err := SetupSecurityOptions(proxyConfig, o, jwtPolicy.Get(),
- credFetcherTypeEnv, credIdentityProvider)
+ o, err := SetupSecurityOptions(proxyConfig, o, jwtPolicy.Get())
if err != nil {
return o, err
}
@@ -31,21 +28,13 @@ func NewSecurityOptions(proxyConfig
*meshconfig.ProxyConfig, stsPort int, tokenM
return o, err
}
-func SetupSecurityOptions(proxyConfig *meshconfig.ProxyConfig, secOpt
*security.Options, jwtPolicy,
- credFetcherTypeEnv, credIdentityProvider string,
-) (*security.Options, error) {
- jwtPath := constants.ThirdPartyJwtPath
+func SetupSecurityOptions(proxyConfig *meshconfig.ProxyConfig, secOpt
*security.Options, jwtPolicy string) (*security.Options, error) {
switch jwtPolicy {
- case jwt.PolicyThirdParty:
- klog.Info("JWT policy is third-party-jwt")
- jwtPath = constants.ThirdPartyJwtPath
case jwt.PolicyFirstParty:
klog.Warningf("Using deprecated JWT policy 'first-party-jwt';
treating as 'third-party-jwt'")
- jwtPath = constants.ThirdPartyJwtPath
default:
klog.Info("Using existing certs")
}
-
o := secOpt
// If not set explicitly, default to the discovery address.
@@ -54,14 +43,6 @@ func SetupSecurityOptions(proxyConfig
*meshconfig.ProxyConfig, secOpt *security.
o.CAEndpointSAN = dubbodSAN.Get()
}
- o.CredIdentityProvider = credIdentityProvider
- credFetcher, err :=
credentialfetcher.NewCredFetcher(credFetcherTypeEnv, o.TrustDomain, jwtPath,
o.CredIdentityProvider)
- if err != nil {
- return nil, fmt.Errorf("failed to create credential fetcher:
%v", err)
- }
- klog.Infof("using credential fetcher of %s type in %s trust domain",
credFetcherTypeEnv, o.TrustDomain)
- o.CredFetcher = credFetcher
-
if o.ProvCert != "" && o.FileMountedCerts {
return nil, fmt.Errorf("invalid options: PROV_CERT and
FILE_MOUNTED_CERTS are mutually exclusive")
}
diff --git a/sail/pkg/bootstrap/configcontroller.go
b/sail/pkg/bootstrap/configcontroller.go
index af68ff89..c77a9825 100644
--- a/sail/pkg/bootstrap/configcontroller.go
+++ b/sail/pkg/bootstrap/configcontroller.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/dubbo-kubernetes/pkg/adsc"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collections"
configaggregate
"github.com/apache/dubbo-kubernetes/sail/pkg/config/aggregate"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/config/kube/crdclient"
"github.com/apache/dubbo-kubernetes/sail/pkg/config/kube/file"
"github.com/apache/dubbo-kubernetes/sail/pkg/config/memory"
dubboCredentials
"github.com/apache/dubbo-kubernetes/sail/pkg/credentials"
@@ -174,11 +175,24 @@ func (s *Server) initConfigSources(args *SailArgs) (err
error) {
return nil
}
+func (s *Server) makeKubeConfigController(args *SailArgs) *crdclient.Client {
+ opts := crdclient.Option{
+ DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix,
+ Identifier: "crd-controller",
+ KrtDebugger: args.KrtDebugger,
+ }
+
+ schemas := collections.Sail
+
+ return crdclient.NewForSchemas(s.kubeClient, opts, schemas)
+}
+
func (s *Server) initK8SConfigStore(args *SailArgs) error {
if s.kubeClient == nil {
return nil
}
- // TODO
+ configController := s.makeKubeConfigController(args)
+ s.ConfigStores = append(s.ConfigStores, configController)
return nil
}
diff --git a/sail/pkg/bootstrap/ca.go b/sail/pkg/bootstrap/dubbo_ca.go
similarity index 98%
rename from sail/pkg/bootstrap/ca.go
rename to sail/pkg/bootstrap/dubbo_ca.go
index 6b88a03e..08b5d6ae 100644
--- a/sail/pkg/bootstrap/ca.go
+++ b/sail/pkg/bootstrap/dubbo_ca.go
@@ -275,12 +275,6 @@ func (s *Server) createDubboCA(opts *caOptions)
(*ca.DubboCA, error) {
if useSelfSignedCA {
if features.UseCacertsForSelfSignedCA && dubboGenerated {
klog.Infof("DubboGenerated %s secret found, use it as
the CA certificate", ca.CACertsSecret)
-
- // TODO(jaellio): Currently, when the
USE_CACERTS_FOR_SELF_SIGNED_CA flag is true dubbod
- // handles loading and updating the "cacerts" secret
with the "dubbo-generated" key the
- // same way it handles the "dubbo-ca-secret" secret.
Isitod utilizes a secret watch instead
- // of file watch to check for secret updates. This may
change in the future, and dubbod
- // will watch the file mount instead.
}
// Either the secret is not mounted because it is named
`dubbo-ca-secret`,
diff --git a/sail/pkg/bootstrap/proxylessinjector.go
b/sail/pkg/bootstrap/proxylessinjector.go
index 6c35b198..e8f0e9df 100644
--- a/sail/pkg/bootstrap/proxylessinjector.go
+++ b/sail/pkg/bootstrap/proxylessinjector.go
@@ -39,7 +39,7 @@ func (s *Server) initProxylessInjector(args *SailArgs)
(*inject.Webhook, error)
return nil, err
}
} else if s.kubeClient != nil {
- configMapName := getInjectorConfigMapName("")
+ configMapName := getInjectorConfigMapName()
cms := s.kubeClient.Kube().CoreV1().ConfigMaps(args.Namespace)
if _, err := cms.Get(context.TODO(), configMapName,
metav1.GetOptions{}); err != nil {
if errors.IsNotFound(err) {
@@ -66,13 +66,9 @@ func (s *Server) initProxylessInjector(args *SailArgs)
(*inject.Webhook, error)
if err != nil {
return nil, fmt.Errorf("failed to create injection webhook:
%v", err)
}
- // Patch cert if a webhook config name is provided.
- // This requires RBAC permissions - a low-priv Istiod should not
attempt to patch but rely on
- // operator or CI/CD
+
if features.InjectionWebhookConfigName != "" {
s.addStartFunc("injection patcher", func(stop <-chan struct{})
error {
- // No leader election - different istiod revisions will
patch their own cert.
- // update webhook configuration by watching the cabundle
patcher, err :=
webhooks.NewWebhookCertPatcher(s.kubeClient, webhookName,
s.dubbodCertBundleWatcher)
if err != nil {
klog.Errorf("failed to create webhook cert
patcher: %v", err)
@@ -91,10 +87,7 @@ func (s *Server) initProxylessInjector(args *SailArgs)
(*inject.Webhook, error)
return wh, nil
}
-func getInjectorConfigMapName(revision string) string {
+func getInjectorConfigMapName() string {
name := defaultInjectorConfigMapName
- if revision == "" || revision == "default" {
- return name
- }
- return name + "-" + revision
+ return name
}
diff --git a/sail/pkg/bootstrap/server.go b/sail/pkg/bootstrap/server.go
index 5a427f50..927fabeb 100644
--- a/sail/pkg/bootstrap/server.go
+++ b/sail/pkg/bootstrap/server.go
@@ -147,15 +147,18 @@ func NewServer(args *SailArgs, initFuncs
...func(*Server)) (*Server, error) {
dubbodCertBundleWatcher: keycertbundle.NewWatcher(),
fileWatcher: filewatcher.NewWatcher(),
internalStop: make(chan struct{}),
+ readinessProbes: make(map[string]readinessProbe),
readinessFlags: &readinessFlags{},
webhookInfo: &webhookInfo{},
}
for _, fn := range initFuncs {
fn(s)
}
+
s.XDSServer = xds.NewDiscoveryServer(e,
args.RegistryOptions.KubeOptions.ClusterAliases, args.KrtDebugger)
- // TODO configGen
+ // TODO xds cache
// TODO initReadinessProbes
+
s.initServers(args)
if err := s.serveHTTP(); err != nil {
@@ -165,29 +168,25 @@ func NewServer(args *SailArgs, initFuncs
...func(*Server)) (*Server, error) {
if err := s.initKubeClient(args); err != nil {
return nil, fmt.Errorf("error initializing kube client: %v",
err)
}
+
s.initMeshConfiguration(args, s.fileWatcher)
if s.kubeClient != nil {
- // // Build a namespace watcher. This must have no filter,
since this is our input to the filter itself.
+ // Build a namespace watcher. This must have no filter, since
this is our input to the filter itself.
namespaces := kclient.New[*corev1.Namespace](s.kubeClient)
filter := namespace.NewDiscoveryNamespacesFilter(namespaces,
s.environment.Watcher, s.internalStop)
s.kubeClient = kubelib.SetObjectFilter(s.kubeClient, filter)
}
s.initMeshNetworks(args, s.fileWatcher)
- // TODO enovy proxy?
- // s.initMeshHandlers(configGen.MeshConfigChanged)
- // s.environment.Init()
- // if err := s.environment.InitNetworksManager(s.XDSServer); err != nil
{
- // return nil, err
- // }
-
- // TODO If enabled, mesh will support certificates signed by more than
one trustAnchor for DUBBO_MUTUAL mTLS
- // if features.MultiRootMesh {
- // // Initialize trust bundle after mesh config which it depends on
- // s.workloadTrustBundle = tb.NewTrustBundle(nil, e.Watcher)
- // e.TrustBundle = s.workloadTrustBundle
- // }
+ // TODO initMeshHandlers
+
+ s.environment.Init()
+ if err := s.environment.InitNetworksManager(s.XDSServer); err != nil {
+ return nil, err
+ }
+
+ // TODO MultiRootMesh
// Options based on the current 'defaults' in dubbo.
caOpts := &caOptions{
@@ -339,7 +338,7 @@ func (s *Server) startCA(caOpts *caOptions) {
if s.secureGrpcServer == nil {
grpcServer = s.grpcServer
}
- klog.Infof("starting CA server")
+ klog.Infof("Starting CA server")
s.RunCA(grpcServer)
return nil
})
@@ -386,6 +385,7 @@ func (s *Server) initKubeClient(args *SailArgs) error {
if err != nil {
return fmt.Errorf("failed creating kube client: %v",
err)
}
+ s.kubeClient = kubelib.EnableCrdWatcher(s.kubeClient)
}
return nil
@@ -631,7 +631,7 @@ func (s *Server) initSDSServer() {
// isK8SSigning returns whether K8S (as a RA) is used to sign certs instead of
private keys known by Dubbod
func (s *Server) isK8SSigning() bool {
- return s.RA != nil && strings.HasPrefix(features.NaviCertProvider,
constants.CertProviderKubernetesSignerPrefix)
+ return s.RA != nil && strings.HasPrefix(features.SailCertProvider,
constants.CertProviderKubernetesSignerPrefix)
}
func (s *Server) waitForShutdown(stop <-chan struct{}) {
diff --git a/sail/pkg/bootstrap/validation.go b/sail/pkg/bootstrap/validation.go
index 7f7fd73a..5de2ae57 100644
--- a/sail/pkg/bootstrap/validation.go
+++ b/sail/pkg/bootstrap/validation.go
@@ -1,6 +1,9 @@
package bootstrap
import (
+ "github.com/apache/dubbo-kubernetes/pkg/webhooks/server"
+ "github.com/apache/dubbo-kubernetes/pkg/webhooks/validation/controller"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/features"
"k8s.io/klog/v2"
)
@@ -9,5 +12,23 @@ func (s *Server) initConfigValidation(args *SailArgs) error {
return nil
}
klog.Info("initializing config validator")
+ params := server.Options{
+ DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix,
+ Mux: s.httpsMux,
+ }
+ _, err := server.New(params)
+ if err != nil {
+ return err
+ }
+ s.readinessFlags.configValidationReady.Store(true)
+
+ if features.ValidationWebhookConfigName != "" && s.kubeClient != nil {
+ s.addStartFunc("validation controller", func(stop <-chan
struct{}) error {
+ klog.Infof("Starting validation controller")
+ go controller.NewValidatingWebhookController(
+ s.kubeClient, args.Namespace,
s.dubbodCertBundleWatcher).Run(stop)
+ return nil
+ })
+ }
return nil
}
diff --git a/sail/pkg/config/kube/crd/config.go
b/sail/pkg/config/kube/crd/config.go
index 7b963013..996c9c27 100644
--- a/sail/pkg/config/kube/crd/config.go
+++ b/sail/pkg/config/kube/crd/config.go
@@ -3,6 +3,7 @@ package crd
import (
"encoding/json"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
)
type DubboKind struct {
@@ -11,3 +12,47 @@ type DubboKind struct {
Spec json.RawMessage `json:"spec"`
Status *json.RawMessage `json:"status,omitempty"`
}
+
+func (in *DubboKind) DeepCopyObject() runtime.Object {
+ if c := in.DeepCopy(); c != nil {
+ return c
+ }
+
+ return nil
+}
+
+func (in *DubboKind) DeepCopy() *DubboKind {
+ if in == nil {
+ return nil
+ }
+ out := new(DubboKind)
+ in.DeepCopyInto(out)
+ return out
+}
+
+func (in *DubboKind) DeepCopyInto(out *DubboKind) {
+ *out = *in
+ out.TypeMeta = in.TypeMeta
+ in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
+ out.Spec = in.Spec
+ out.Status = in.Status
+}
+
+func (in *DubboKind) GetObjectMeta() metav1.ObjectMeta {
+ return in.ObjectMeta
+}
+
+func (in *DubboKind) GetSpec() json.RawMessage {
+ return in.Spec
+}
+
+func (in *DubboKind) GetStatus() *json.RawMessage {
+ return in.Status
+}
+
+type DubboObject interface {
+ runtime.Object
+ GetSpec() json.RawMessage
+ GetStatus() *json.RawMessage
+ GetObjectMeta() metav1.ObjectMeta
+}
diff --git a/sail/pkg/config/kube/crd/conversion.go
b/sail/pkg/config/kube/crd/conversion.go
index 66b12cb7..8cf499e1 100644
--- a/sail/pkg/config/kube/crd/conversion.go
+++ b/sail/pkg/config/kube/crd/conversion.go
@@ -2,21 +2,18 @@ package crd
import (
"bytes"
+ "encoding/json"
"fmt"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/resource"
"io"
kubeyaml "k8s.io/apimachinery/pkg/util/yaml"
+ "k8s.io/klog/v2"
"reflect"
)
type ConversionFunc = func(s resource.Schema, js string) (config.Spec, error)
-// TODO - add special cases for type-to-kind and kind-to-type
-// conversions with initial-isms. Consider adding additional type
-// information to the abstract model and/or elevating k8s
-// representation to first-class type to avoid extra conversions.
-
func parseInputsImpl(inputs string, withValidate bool) ([]config.Config,
[]DubboKind, error) {
var varr []config.Config
var others []DubboKind
@@ -44,14 +41,71 @@ func parseInputsImpl(inputs string, withValidate bool)
([]config.Config, []Dubbo
return varr, others, nil
}
-// ParseInputs reads multiple documents from `kubectl` output and checks with
-// the schema. It also returns the list of unrecognized kinds as the second
-// response.
-//
-// NOTE: This function only decodes a subset of the complete k8s
-// ObjectMeta as identified by the fields in model.Meta. This
-// would typically only be a problem if a user dumps an configuration
-// object with kubectl and then re-ingests it.
func ParseInputs(inputs string) ([]config.Config, []DubboKind, error) {
return parseInputsImpl(inputs, true)
}
+
+func FromJSON(s resource.Schema, js string) (config.Spec, error) {
+ c, err := s.NewInstance()
+ if err != nil {
+ return nil, err
+ }
+ if err = config.ApplyJSON(c, js); err != nil {
+ return nil, err
+ }
+ return c, nil
+}
+
+func ConvertObject(schema resource.Schema, object DubboObject, domain string)
(*config.Config, error) {
+ return ConvertObjectInternal(schema, object, domain, FromJSON)
+}
+
+func StatusJSONFromMap(schema resource.Schema, jsonMap *json.RawMessage)
(config.Status, error) {
+ if jsonMap == nil {
+ return nil, nil
+ }
+ js, err := json.Marshal(jsonMap)
+ if err != nil {
+ return nil, err
+ }
+ status, err := schema.Status()
+ if err != nil {
+ return nil, err
+ }
+ err = json.Unmarshal(js, status)
+ if err != nil {
+ return nil, err
+ }
+ return status, nil
+}
+
+func ConvertObjectInternal(schema resource.Schema, object DubboObject, domain
string, convert ConversionFunc) (*config.Config, error) {
+ js, err := json.Marshal(object.GetSpec())
+ if err != nil {
+ return nil, err
+ }
+ spec, err := convert(schema, string(js))
+ if err != nil {
+ return nil, err
+ }
+ status, err := StatusJSONFromMap(schema, object.GetStatus())
+ if err != nil {
+ klog.Errorf("could not get istio status from map %v, err %v",
object.GetStatus(), err)
+ }
+ meta := object.GetObjectMeta()
+
+ return &config.Config{
+ Meta: config.Meta{
+ GroupVersionKind: schema.GroupVersionKind(),
+ Name: meta.Name,
+ Namespace: meta.Namespace,
+ Domain: domain,
+ Labels: meta.Labels,
+ Annotations: meta.Annotations,
+ ResourceVersion: meta.ResourceVersion,
+ CreationTimestamp: meta.CreationTimestamp.Time,
+ },
+ Spec: spec,
+ Status: status,
+ }, nil
+}
diff --git a/sail/pkg/config/kube/crdclient/client.go
b/sail/pkg/config/kube/crdclient/client.go
index da7070cb..da1ebc19 100644
--- a/sail/pkg/config/kube/crdclient/client.go
+++ b/sail/pkg/config/kube/crdclient/client.go
@@ -1,3 +1,361 @@
package crdclient
-type Client struct{}
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/resource"
+ "github.com/apache/dubbo-kubernetes/pkg/kube"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/controllers"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/kclient"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/krt"
+ "github.com/apache/dubbo-kubernetes/pkg/kube/kubetypes"
+ "github.com/apache/dubbo-kubernetes/pkg/maps"
+ "github.com/apache/dubbo-kubernetes/sail/pkg/model"
+ jsonmerge "github.com/evanphx/json-patch/v5"
+ "go.uber.org/atomic"
+ "gomodules.xyz/jsonpatch/v2"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/klog/v2"
+ "sync"
+ "time"
+)
+
+type Client struct {
+ started *atomic.Bool
+ stop chan struct{}
+ kinds map[config.GroupVersionKind]nsStore
+ kindsMu sync.RWMutex
+ domainSuffix string
+ schemasByCRDName map[string]resource.Schema
+ schemas collection.Schemas
+ client kube.Client
+ filtersByGVK map[config.GroupVersionKind]kubetypes.Filter
+}
+
+var _ model.ConfigStoreController = &Client{}
+
+type nsStore struct {
+ collection krt.Collection[config.Config]
+ index krt.Index[string, config.Config]
+ handlers []krt.HandlerRegistration
+}
+
+type Option struct {
+ DomainSuffix string
+ Identifier string
+ FiltersByGVK map[config.GroupVersionKind]kubetypes.Filter
+ KrtDebugger *krt.DebugHandler
+}
+
+func NewForSchemas(client kube.Client, opts Option, schemas
collection.Schemas) *Client {
+ schemasByCRDName := map[string]resource.Schema{}
+ for _, s := range schemas.All() {
+ // From the spec: "Its name MUST be in the format
<.spec.name>.<.spec.group>."
+ name := fmt.Sprintf("%s.%s", s.Plural(), s.Group())
+ schemasByCRDName[name] = s
+ }
+
+ stop := make(chan struct{})
+
+ out := &Client{
+ domainSuffix: opts.DomainSuffix,
+ schemas: schemas,
+ schemasByCRDName: schemasByCRDName,
+ started: atomic.NewBool(false),
+ kinds: map[config.GroupVersionKind]nsStore{},
+ client: client,
+ filtersByGVK: opts.FiltersByGVK,
+ stop: stop,
+ }
+
+ kopts := krt.NewOptionsBuilder(stop, "crdclient", opts.KrtDebugger)
+ for _, s := range out.schemas.All() {
+ // From the spec: "Its name MUST be in the format
<.spec.name>.<.spec.group>."
+ name := fmt.Sprintf("%s.%s", s.Plural(), s.Group())
+ out.addCRD(name, kopts)
+ }
+
+ return out
+}
+
+func (cl *Client) Run(stop <-chan struct{}) {
+ if cl.started.Swap(true) {
+ // was already started by other thread
+ return
+ }
+
+ t0 := time.Now()
+ klog.Info("Starting Sail Kubernetes CRD controller")
+ if !kube.WaitForCacheSync("crdclient", stop, cl.informerSynced) {
+ klog.Errorf("Failed to sync Sail Kubernetes CRD controller
cache")
+ } else {
+ klog.Infof("Sail Kubernetes CRD controller synced in %v",
time.Since(t0))
+ }
+ <-stop
+ close(cl.stop)
+ klog.Info("controller terminated")
+}
+
+func (cl *Client) HasSynced() bool {
+ for _, ctl := range cl.allKinds() {
+ if !ctl.collection.HasSynced() {
+ return false
+ }
+
+ for _, h := range ctl.handlers {
+ if !h.HasSynced() {
+ return false
+ }
+ }
+ }
+
+ return true
+}
+
+func (cl *Client) informerSynced() bool {
+ for gk, ctl := range cl.allKinds() {
+ if !ctl.collection.HasSynced() {
+ klog.Infof("controller %q is syncing...", gk)
+ return false
+ }
+ }
+ return true
+}
+
+func (cl *Client) allKinds() map[config.GroupVersionKind]nsStore {
+ cl.kindsMu.RLock()
+ defer cl.kindsMu.RUnlock()
+ return maps.Clone(cl.kinds)
+}
+
+func getObjectMetadata(config config.Config) metav1.ObjectMeta {
+ return metav1.ObjectMeta{
+ Name: config.Name,
+ Namespace: config.Namespace,
+ Labels: config.Labels,
+ Annotations: config.Annotations,
+ ResourceVersion: config.ResourceVersion,
+ OwnerReferences: config.OwnerReferences,
+ UID: types.UID(config.UID),
+ }
+}
+
+func genPatchBytes(oldRes, modRes runtime.Object, patchType types.PatchType)
([]byte, error) {
+ oldJSON, err := json.Marshal(oldRes)
+ if err != nil {
+ return nil, fmt.Errorf("failed marhsalling original resource:
%v", err)
+ }
+ newJSON, err := json.Marshal(modRes)
+ if err != nil {
+ return nil, fmt.Errorf("failed marhsalling modified resource:
%v", err)
+ }
+ switch patchType {
+ case types.JSONPatchType:
+ ops, err := jsonpatch.CreatePatch(oldJSON, newJSON)
+ if err != nil {
+ return nil, err
+ }
+ return json.Marshal(ops)
+ case types.MergePatchType:
+ return jsonmerge.CreateMergePatch(oldJSON, newJSON)
+ default:
+ return nil, fmt.Errorf("unsupported patch type: %v. must be one
of JSONPatchType or MergePatchType", patchType)
+ }
+}
+
+func (cl *Client) addCRD(name string, opts krt.OptionsBuilder) {
+ klog.V(2).Infof("adding CRD %q", name)
+ s, f := cl.schemasByCRDName[name]
+ if !f {
+ klog.V(2).Infof("added resource that we are not watching: %v",
name)
+ return
+ }
+ resourceGVK := s.GroupVersionKind()
+ gvr := s.GroupVersionResource()
+
+ cl.kindsMu.Lock()
+ defer cl.kindsMu.Unlock()
+ if _, f := cl.kinds[resourceGVK]; f {
+ klog.V(2).Infof("added resource that already exists: %v",
resourceGVK)
+ return
+ }
+ translateFunc, f := translationMap[resourceGVK]
+ if !f {
+ klog.Errorf("translation function for %v not found",
resourceGVK)
+ return
+ }
+
+ var extraFilter func(obj any) bool
+ var transform func(obj any) (any, error)
+ var fieldSelector string
+ if of, f := cl.filtersByGVK[resourceGVK]; f {
+ if of.ObjectFilter != nil {
+ extraFilter = of.ObjectFilter.Filter
+ }
+ if of.ObjectTransform != nil {
+ transform = of.ObjectTransform
+ }
+ fieldSelector = of.FieldSelector
+ }
+
+ var namespaceFilter kubetypes.DynamicObjectFilter
+ if !s.IsClusterScoped() {
+ namespaceFilter = cl.client.ObjectFilter()
+ }
+
+ filter := kubetypes.Filter{
+ ObjectFilter: kubetypes.ComposeFilters(namespaceFilter,
extraFilter),
+ ObjectTransform: transform,
+ FieldSelector: fieldSelector,
+ }
+
+ var kc kclient.Untyped
+ if s.IsBuiltin() {
+ kc = kclient.NewUntypedInformer(cl.client, gvr, filter)
+ } else {
+ kc = kclient.NewDelayedInformer[controllers.Object](
+ cl.client,
+ gvr,
+ kubetypes.StandardInformer,
+ filter,
+ )
+ }
+
+ wrappedClient := krt.WrapClient(kc,
opts.WithName("informer/"+resourceGVK.Kind)...)
+ collection := krt.MapCollection(wrappedClient, func(obj
controllers.Object) config.Config {
+ cfg := translateFunc(obj)
+ cfg.Domain = cl.domainSuffix
+ return cfg
+ }, opts.WithName("collection/"+resourceGVK.Kind)...)
+ index := krt.NewNamespaceIndex(collection)
+ cl.kinds[resourceGVK] = nsStore{
+ collection: collection,
+ index: index,
+ handlers: []krt.HandlerRegistration{
+ collection.RegisterBatch(func(o
[]krt.Event[config.Config]) {
+ }, false),
+ },
+ }
+}
+
+func (cl *Client) kind(r config.GroupVersionKind) (nsStore, bool) {
+ cl.kindsMu.RLock()
+ defer cl.kindsMu.RUnlock()
+ ch, ok := cl.kinds[r]
+ return ch, ok
+}
+
+func (cl *Client) Schemas() collection.Schemas {
+ return cl.schemas
+}
+
+func (cl *Client) Get(typ config.GroupVersionKind, name, namespace string)
*config.Config {
+ h, f := cl.kind(typ)
+ if !f {
+ klog.Warningf("unknown type: %s", typ)
+ return nil
+ }
+
+ var key string
+ if namespace == "" {
+ key = name
+ } else {
+ key = namespace + "/" + name
+ }
+
+ obj := h.collection.GetKey(key)
+ if obj == nil {
+ klog.V(2).Infof("couldn't find %s/%s in informer index",
namespace, name)
+ return nil
+ }
+
+ return obj
+}
+
+func (cl *Client) Create(cfg config.Config) (string, error) {
+ if cfg.Spec == nil {
+ return "", fmt.Errorf("nil spec for %v/%v", cfg.Name,
cfg.Namespace)
+ }
+
+ meta, err := create(cl.client, cfg, getObjectMetadata(cfg))
+ if err != nil {
+ return "", err
+ }
+ return meta.GetResourceVersion(), nil
+}
+
+func (cl *Client) Update(cfg config.Config) (string, error) {
+ if cfg.Spec == nil {
+ return "", fmt.Errorf("nil spec for %v/%v", cfg.Name,
cfg.Namespace)
+ }
+
+ meta, err := update(cl.client, cfg, getObjectMetadata(cfg))
+ if err != nil {
+ return "", err
+ }
+ return meta.GetResourceVersion(), nil
+}
+
+func (cl *Client) UpdateStatus(cfg config.Config) (string, error) {
+ if cfg.Status == nil {
+ return "", fmt.Errorf("nil status for %v/%v on updateStatus()",
cfg.Name, cfg.Namespace)
+ }
+
+ meta, err := updateStatus(cl.client, cfg, getObjectMetadata(cfg))
+ if err != nil {
+ return "", err
+ }
+ return meta.GetResourceVersion(), nil
+}
+
+func (cl *Client) Patch(orig config.Config, patchFn config.PatchFunc) (string,
error) {
+ modified, patchType := patchFn(orig.DeepCopy())
+
+ meta, err := patch(cl.client, orig, getObjectMetadata(orig), modified,
getObjectMetadata(modified), patchType)
+ if err != nil {
+ return "", err
+ }
+ return meta.GetResourceVersion(), nil
+}
+
+func (cl *Client) Delete(typ config.GroupVersionKind, name, namespace string,
resourceVersion *string) error {
+ return delete(cl.client, typ, name, namespace, resourceVersion)
+}
+
+// List implements store interface
+func (cl *Client) List(kind config.GroupVersionKind, namespace string)
[]config.Config {
+ h, f := cl.kind(kind)
+ if !f {
+ return nil
+ }
+
+ if namespace == metav1.NamespaceAll {
+ return h.collection.List()
+ }
+
+ return h.index.Lookup(namespace)
+}
+
+func (cl *Client) RegisterEventHandler(kind config.GroupVersionKind, handler
model.EventHandler) {
+ if c, ok := cl.kind(kind); ok {
+ c.handlers = append(c.handlers,
c.collection.RegisterBatch(func(o []krt.Event[config.Config]) {
+ for _, event := range o {
+ switch event.Event {
+ case controllers.EventAdd:
+ handler(config.Config{}, *event.New,
model.Event(event.Event))
+ case controllers.EventUpdate:
+ handler(*event.Old, *event.New,
model.Event(event.Event))
+ case controllers.EventDelete:
+ handler(config.Config{}, *event.Old,
model.Event(event.Event))
+ }
+ }
+ }, false))
+ return
+ }
+
+ klog.Warningf("unknown type: %s", kind)
+}
diff --git a/sail/pkg/config/kube/crdclient/types.go
b/sail/pkg/config/kube/crdclient/types.go
new file mode 100644
index 00000000..7bf590f5
--- /dev/null
+++ b/sail/pkg/config/kube/crdclient/types.go
@@ -0,0 +1,520 @@
+package crdclient
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config"
+ "github.com/apache/dubbo-kubernetes/pkg/config/schema/gvk"
+ "github.com/apache/dubbo-kubernetes/pkg/kube"
+ istioioapimetav1alpha1 "istio.io/api/meta/v1alpha1"
+ istioioapinetworkingv1alpha3 "istio.io/api/networking/v1alpha3"
+ istioioapisecurityv1beta1 "istio.io/api/security/v1beta1"
+ apiistioioapinetworkingv1 "istio.io/client-go/pkg/apis/networking/v1"
+ apiistioioapisecurityv1 "istio.io/client-go/pkg/apis/security/v1"
+ k8sioapiadmissionregistrationv1 "k8s.io/api/admissionregistration/v1"
+ k8sioapiappsv1 "k8s.io/api/apps/v1"
+ k8sioapicorev1 "k8s.io/api/core/v1"
+ k8sioapiextensionsapiserverpkgapisapiextensionsv1
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+)
+
+func create(c kube.Client, cfg config.Config, objMeta metav1.ObjectMeta)
(metav1.Object, error) {
+ switch cfg.GroupVersionKind {
+ case gvk.AuthorizationPolicy:
+ return
c.Dubbo().SecurityV1().AuthorizationPolicies(cfg.Namespace).Create(context.TODO(),
&apiistioioapisecurityv1.AuthorizationPolicy{
+ ObjectMeta: objMeta,
+ Spec:
*(cfg.Spec.(*istioioapisecurityv1beta1.AuthorizationPolicy)),
+ }, metav1.CreateOptions{})
+ case gvk.DestinationRule:
+ return
c.Dubbo().NetworkingV1().DestinationRules(cfg.Namespace).Create(context.TODO(),
&apiistioioapinetworkingv1.DestinationRule{
+ ObjectMeta: objMeta,
+ Spec:
*(cfg.Spec.(*istioioapinetworkingv1alpha3.DestinationRule)),
+ }, metav1.CreateOptions{})
+ case gvk.PeerAuthentication:
+ return
c.Dubbo().SecurityV1().PeerAuthentications(cfg.Namespace).Create(context.TODO(),
&apiistioioapisecurityv1.PeerAuthentication{
+ ObjectMeta: objMeta,
+ Spec:
*(cfg.Spec.(*istioioapisecurityv1beta1.PeerAuthentication)),
+ }, metav1.CreateOptions{})
+ case gvk.RequestAuthentication:
+ return
c.Dubbo().SecurityV1().RequestAuthentications(cfg.Namespace).Create(context.TODO(),
&apiistioioapisecurityv1.RequestAuthentication{
+ ObjectMeta: objMeta,
+ Spec:
*(cfg.Spec.(*istioioapisecurityv1beta1.RequestAuthentication)),
+ }, metav1.CreateOptions{})
+ case gvk.VirtualService:
+ return
c.Dubbo().NetworkingV1().VirtualServices(cfg.Namespace).Create(context.TODO(),
&apiistioioapinetworkingv1.VirtualService{
+ ObjectMeta: objMeta,
+ Spec:
*(cfg.Spec.(*istioioapinetworkingv1alpha3.VirtualService)),
+ }, metav1.CreateOptions{})
+ default:
+ return nil, fmt.Errorf("unsupported type: %v",
cfg.GroupVersionKind)
+ }
+}
+
+func update(c kube.Client, cfg config.Config, objMeta metav1.ObjectMeta)
(metav1.Object, error) {
+ switch cfg.GroupVersionKind {
+ case gvk.AuthorizationPolicy:
+ return
c.Dubbo().SecurityV1().AuthorizationPolicies(cfg.Namespace).Update(context.TODO(),
&apiistioioapisecurityv1.AuthorizationPolicy{
+ ObjectMeta: objMeta,
+ Spec:
*(cfg.Spec.(*istioioapisecurityv1beta1.AuthorizationPolicy)),
+ }, metav1.UpdateOptions{})
+ case gvk.DestinationRule:
+ return
c.Dubbo().NetworkingV1().DestinationRules(cfg.Namespace).Update(context.TODO(),
&apiistioioapinetworkingv1.DestinationRule{
+ ObjectMeta: objMeta,
+ Spec:
*(cfg.Spec.(*istioioapinetworkingv1alpha3.DestinationRule)),
+ }, metav1.UpdateOptions{})
+ case gvk.PeerAuthentication:
+ return
c.Dubbo().SecurityV1().PeerAuthentications(cfg.Namespace).Update(context.TODO(),
&apiistioioapisecurityv1.PeerAuthentication{
+ ObjectMeta: objMeta,
+ Spec:
*(cfg.Spec.(*istioioapisecurityv1beta1.PeerAuthentication)),
+ }, metav1.UpdateOptions{})
+ case gvk.RequestAuthentication:
+ return
c.Dubbo().SecurityV1().RequestAuthentications(cfg.Namespace).Update(context.TODO(),
&apiistioioapisecurityv1.RequestAuthentication{
+ ObjectMeta: objMeta,
+ Spec:
*(cfg.Spec.(*istioioapisecurityv1beta1.RequestAuthentication)),
+ }, metav1.UpdateOptions{})
+ case gvk.VirtualService:
+ return
c.Dubbo().NetworkingV1().VirtualServices(cfg.Namespace).Update(context.TODO(),
&apiistioioapinetworkingv1.VirtualService{
+ ObjectMeta: objMeta,
+ Spec:
*(cfg.Spec.(*istioioapinetworkingv1alpha3.VirtualService)),
+ }, metav1.UpdateOptions{})
+ default:
+ return nil, fmt.Errorf("unsupported type: %v",
cfg.GroupVersionKind)
+ }
+}
+
+func updateStatus(c kube.Client, cfg config.Config, objMeta metav1.ObjectMeta)
(metav1.Object, error) {
+ switch cfg.GroupVersionKind {
+ case gvk.AuthorizationPolicy:
+ return
c.Dubbo().SecurityV1().AuthorizationPolicies(cfg.Namespace).UpdateStatus(context.TODO(),
&apiistioioapisecurityv1.AuthorizationPolicy{
+ ObjectMeta: objMeta,
+ Status:
*(cfg.Status.(*istioioapimetav1alpha1.IstioStatus)),
+ }, metav1.UpdateOptions{})
+ case gvk.DestinationRule:
+ return
c.Dubbo().NetworkingV1().DestinationRules(cfg.Namespace).UpdateStatus(context.TODO(),
&apiistioioapinetworkingv1.DestinationRule{
+ ObjectMeta: objMeta,
+ Status:
*(cfg.Status.(*istioioapimetav1alpha1.IstioStatus)),
+ }, metav1.UpdateOptions{})
+ case gvk.PeerAuthentication:
+ return
c.Dubbo().SecurityV1().PeerAuthentications(cfg.Namespace).UpdateStatus(context.TODO(),
&apiistioioapisecurityv1.PeerAuthentication{
+ ObjectMeta: objMeta,
+ Status:
*(cfg.Status.(*istioioapimetav1alpha1.IstioStatus)),
+ }, metav1.UpdateOptions{})
+ case gvk.RequestAuthentication:
+ return
c.Dubbo().SecurityV1().RequestAuthentications(cfg.Namespace).UpdateStatus(context.TODO(),
&apiistioioapisecurityv1.RequestAuthentication{
+ ObjectMeta: objMeta,
+ Status:
*(cfg.Status.(*istioioapimetav1alpha1.IstioStatus)),
+ }, metav1.UpdateOptions{})
+ case gvk.VirtualService:
+ return
c.Dubbo().NetworkingV1().VirtualServices(cfg.Namespace).UpdateStatus(context.TODO(),
&apiistioioapinetworkingv1.VirtualService{
+ ObjectMeta: objMeta,
+ Status:
*(cfg.Status.(*istioioapimetav1alpha1.IstioStatus)),
+ }, metav1.UpdateOptions{})
+ default:
+ return nil, fmt.Errorf("unsupported type: %v",
cfg.GroupVersionKind)
+ }
+}
+
+func patch(c kube.Client, orig config.Config, origMeta metav1.ObjectMeta, mod
config.Config, modMeta metav1.ObjectMeta, typ types.PatchType) (metav1.Object,
error) {
+ if orig.GroupVersionKind != mod.GroupVersionKind {
+ return nil, fmt.Errorf("gvk mismatch: %v, modified: %v",
orig.GroupVersionKind, mod.GroupVersionKind)
+ }
+ switch orig.GroupVersionKind {
+ case gvk.AuthorizationPolicy:
+ oldRes := &apiistioioapisecurityv1.AuthorizationPolicy{
+ ObjectMeta: origMeta,
+ Spec:
*(orig.Spec.(*istioioapisecurityv1beta1.AuthorizationPolicy)),
+ }
+ modRes := &apiistioioapisecurityv1.AuthorizationPolicy{
+ ObjectMeta: modMeta,
+ Spec:
*(mod.Spec.(*istioioapisecurityv1beta1.AuthorizationPolicy)),
+ }
+ patchBytes, err := genPatchBytes(oldRes, modRes, typ)
+ if err != nil {
+ return nil, err
+ }
+ return
c.Dubbo().SecurityV1().AuthorizationPolicies(orig.Namespace).
+ Patch(context.TODO(), orig.Name, typ, patchBytes,
metav1.PatchOptions{FieldManager: "pilot-discovery"})
+ case gvk.DestinationRule:
+ oldRes := &apiistioioapinetworkingv1.DestinationRule{
+ ObjectMeta: origMeta,
+ Spec:
*(orig.Spec.(*istioioapinetworkingv1alpha3.DestinationRule)),
+ }
+ modRes := &apiistioioapinetworkingv1.DestinationRule{
+ ObjectMeta: modMeta,
+ Spec:
*(mod.Spec.(*istioioapinetworkingv1alpha3.DestinationRule)),
+ }
+ patchBytes, err := genPatchBytes(oldRes, modRes, typ)
+ if err != nil {
+ return nil, err
+ }
+ return
c.Dubbo().NetworkingV1().DestinationRules(orig.Namespace).
+ Patch(context.TODO(), orig.Name, typ, patchBytes,
metav1.PatchOptions{FieldManager: "pilot-discovery"})
+ case gvk.PeerAuthentication:
+ oldRes := &apiistioioapisecurityv1.PeerAuthentication{
+ ObjectMeta: origMeta,
+ Spec:
*(orig.Spec.(*istioioapisecurityv1beta1.PeerAuthentication)),
+ }
+ modRes := &apiistioioapisecurityv1.PeerAuthentication{
+ ObjectMeta: modMeta,
+ Spec:
*(mod.Spec.(*istioioapisecurityv1beta1.PeerAuthentication)),
+ }
+ patchBytes, err := genPatchBytes(oldRes, modRes, typ)
+ if err != nil {
+ return nil, err
+ }
+ return
c.Dubbo().SecurityV1().PeerAuthentications(orig.Namespace).
+ Patch(context.TODO(), orig.Name, typ, patchBytes,
metav1.PatchOptions{FieldManager: "pilot-discovery"})
+ case gvk.RequestAuthentication:
+ oldRes := &apiistioioapisecurityv1.RequestAuthentication{
+ ObjectMeta: origMeta,
+ Spec:
*(orig.Spec.(*istioioapisecurityv1beta1.RequestAuthentication)),
+ }
+ modRes := &apiistioioapisecurityv1.RequestAuthentication{
+ ObjectMeta: modMeta,
+ Spec:
*(mod.Spec.(*istioioapisecurityv1beta1.RequestAuthentication)),
+ }
+ patchBytes, err := genPatchBytes(oldRes, modRes, typ)
+ if err != nil {
+ return nil, err
+ }
+ return
c.Dubbo().SecurityV1().RequestAuthentications(orig.Namespace).
+ Patch(context.TODO(), orig.Name, typ, patchBytes,
metav1.PatchOptions{FieldManager: "pilot-discovery"})
+ case gvk.VirtualService:
+ oldRes := &apiistioioapinetworkingv1.VirtualService{
+ ObjectMeta: origMeta,
+ Spec:
*(orig.Spec.(*istioioapinetworkingv1alpha3.VirtualService)),
+ }
+ modRes := &apiistioioapinetworkingv1.VirtualService{
+ ObjectMeta: modMeta,
+ Spec:
*(mod.Spec.(*istioioapinetworkingv1alpha3.VirtualService)),
+ }
+ patchBytes, err := genPatchBytes(oldRes, modRes, typ)
+ if err != nil {
+ return nil, err
+ }
+ return c.Dubbo().NetworkingV1().VirtualServices(orig.Namespace).
+ Patch(context.TODO(), orig.Name, typ, patchBytes,
metav1.PatchOptions{FieldManager: "pilot-discovery"})
+ default:
+ return nil, fmt.Errorf("unsupported type: %v",
orig.GroupVersionKind)
+ }
+}
+
+func delete(c kube.Client, typ config.GroupVersionKind, name, namespace
string, resourceVersion *string) error {
+ var deleteOptions metav1.DeleteOptions
+ if resourceVersion != nil {
+ deleteOptions.Preconditions =
&metav1.Preconditions{ResourceVersion: resourceVersion}
+ }
+ switch typ {
+ case gvk.AuthorizationPolicy:
+ return
c.Dubbo().SecurityV1().AuthorizationPolicies(namespace).Delete(context.TODO(),
name, deleteOptions)
+ case gvk.DestinationRule:
+ return
c.Dubbo().NetworkingV1().DestinationRules(namespace).Delete(context.TODO(),
name, deleteOptions)
+ case gvk.PeerAuthentication:
+ return
c.Dubbo().SecurityV1().PeerAuthentications(namespace).Delete(context.TODO(),
name, deleteOptions)
+ case gvk.RequestAuthentication:
+ return
c.Dubbo().SecurityV1().RequestAuthentications(namespace).Delete(context.TODO(),
name, deleteOptions)
+ case gvk.VirtualService:
+ return
c.Dubbo().NetworkingV1().VirtualServices(namespace).Delete(context.TODO(),
name, deleteOptions)
+ default:
+ return fmt.Errorf("unsupported type: %v", typ)
+ }
+}
+
+var translationMap = map[config.GroupVersionKind]func(r runtime.Object)
config.Config{
+ gvk.AuthorizationPolicy: func(r runtime.Object) config.Config {
+ obj := r.(*apiistioioapisecurityv1.AuthorizationPolicy)
+ return config.Config{
+ Meta: config.Meta{
+ GroupVersionKind: gvk.AuthorizationPolicy,
+ Name: obj.Name,
+ Namespace: obj.Namespace,
+ Labels: obj.Labels,
+ Annotations: obj.Annotations,
+ ResourceVersion: obj.ResourceVersion,
+ CreationTimestamp: obj.CreationTimestamp.Time,
+ OwnerReferences: obj.OwnerReferences,
+ UID: string(obj.UID),
+ Generation: obj.Generation,
+ },
+ Spec: &obj.Spec,
+ Status: &obj.Status,
+ }
+ },
+ gvk.ConfigMap: func(r runtime.Object) config.Config {
+ obj := r.(*k8sioapicorev1.ConfigMap)
+ return config.Config{
+ Meta: config.Meta{
+ GroupVersionKind: gvk.ConfigMap,
+ Name: obj.Name,
+ Namespace: obj.Namespace,
+ Labels: obj.Labels,
+ Annotations: obj.Annotations,
+ ResourceVersion: obj.ResourceVersion,
+ CreationTimestamp: obj.CreationTimestamp.Time,
+ OwnerReferences: obj.OwnerReferences,
+ UID: string(obj.UID),
+ Generation: obj.Generation,
+ },
+ Spec: obj,
+ }
+ },
+ gvk.CustomResourceDefinition: func(r runtime.Object) config.Config {
+ obj :=
r.(*k8sioapiextensionsapiserverpkgapisapiextensionsv1.CustomResourceDefinition)
+ return config.Config{
+ Meta: config.Meta{
+ GroupVersionKind: gvk.CustomResourceDefinition,
+ Name: obj.Name,
+ Namespace: obj.Namespace,
+ Labels: obj.Labels,
+ Annotations: obj.Annotations,
+ ResourceVersion: obj.ResourceVersion,
+ CreationTimestamp: obj.CreationTimestamp.Time,
+ OwnerReferences: obj.OwnerReferences,
+ UID: string(obj.UID),
+ Generation: obj.Generation,
+ },
+ Spec: &obj.Spec,
+ }
+ },
+ gvk.DaemonSet: func(r runtime.Object) config.Config {
+ obj := r.(*k8sioapiappsv1.DaemonSet)
+ return config.Config{
+ Meta: config.Meta{
+ GroupVersionKind: gvk.DaemonSet,
+ Name: obj.Name,
+ Namespace: obj.Namespace,
+ Labels: obj.Labels,
+ Annotations: obj.Annotations,
+ ResourceVersion: obj.ResourceVersion,
+ CreationTimestamp: obj.CreationTimestamp.Time,
+ OwnerReferences: obj.OwnerReferences,
+ UID: string(obj.UID),
+ Generation: obj.Generation,
+ },
+ Spec: &obj.Spec,
+ }
+ },
+ gvk.Deployment: func(r runtime.Object) config.Config {
+ obj := r.(*k8sioapiappsv1.Deployment)
+ return config.Config{
+ Meta: config.Meta{
+ GroupVersionKind: gvk.Deployment,
+ Name: obj.Name,
+ Namespace: obj.Namespace,
+ Labels: obj.Labels,
+ Annotations: obj.Annotations,
+ ResourceVersion: obj.ResourceVersion,
+ CreationTimestamp: obj.CreationTimestamp.Time,
+ OwnerReferences: obj.OwnerReferences,
+ UID: string(obj.UID),
+ Generation: obj.Generation,
+ },
+ Spec: &obj.Spec,
+ }
+ },
+ gvk.DestinationRule: func(r runtime.Object) config.Config {
+ obj := r.(*apiistioioapinetworkingv1.DestinationRule)
+ return config.Config{
+ Meta: config.Meta{
+ GroupVersionKind: gvk.DestinationRule,
+ Name: obj.Name,
+ Namespace: obj.Namespace,
+ Labels: obj.Labels,
+ Annotations: obj.Annotations,
+ ResourceVersion: obj.ResourceVersion,
+ CreationTimestamp: obj.CreationTimestamp.Time,
+ OwnerReferences: obj.OwnerReferences,
+ UID: string(obj.UID),
+ Generation: obj.Generation,
+ },
+ Spec: &obj.Spec,
+ Status: &obj.Status,
+ }
+ },
+ gvk.MutatingWebhookConfiguration: func(r runtime.Object) config.Config {
+ obj :=
r.(*k8sioapiadmissionregistrationv1.MutatingWebhookConfiguration)
+ return config.Config{
+ Meta: config.Meta{
+ GroupVersionKind:
gvk.MutatingWebhookConfiguration,
+ Name: obj.Name,
+ Namespace: obj.Namespace,
+ Labels: obj.Labels,
+ Annotations: obj.Annotations,
+ ResourceVersion: obj.ResourceVersion,
+ CreationTimestamp: obj.CreationTimestamp.Time,
+ OwnerReferences: obj.OwnerReferences,
+ UID: string(obj.UID),
+ Generation: obj.Generation,
+ },
+ Spec: obj,
+ }
+ },
+ gvk.Namespace: func(r runtime.Object) config.Config {
+ obj := r.(*k8sioapicorev1.Namespace)
+ return config.Config{
+ Meta: config.Meta{
+ GroupVersionKind: gvk.Namespace,
+ Name: obj.Name,
+ Namespace: obj.Namespace,
+ Labels: obj.Labels,
+ Annotations: obj.Annotations,
+ ResourceVersion: obj.ResourceVersion,
+ CreationTimestamp: obj.CreationTimestamp.Time,
+ OwnerReferences: obj.OwnerReferences,
+ UID: string(obj.UID),
+ Generation: obj.Generation,
+ },
+ Spec: &obj.Spec,
+ }
+ },
+ gvk.PeerAuthentication: func(r runtime.Object) config.Config {
+ obj := r.(*apiistioioapisecurityv1.PeerAuthentication)
+ return config.Config{
+ Meta: config.Meta{
+ GroupVersionKind: gvk.PeerAuthentication,
+ Name: obj.Name,
+ Namespace: obj.Namespace,
+ Labels: obj.Labels,
+ Annotations: obj.Annotations,
+ ResourceVersion: obj.ResourceVersion,
+ CreationTimestamp: obj.CreationTimestamp.Time,
+ OwnerReferences: obj.OwnerReferences,
+ UID: string(obj.UID),
+ Generation: obj.Generation,
+ },
+ Spec: &obj.Spec,
+ Status: &obj.Status,
+ }
+ },
+ gvk.RequestAuthentication: func(r runtime.Object) config.Config {
+ obj := r.(*apiistioioapisecurityv1.RequestAuthentication)
+ return config.Config{
+ Meta: config.Meta{
+ GroupVersionKind: gvk.RequestAuthentication,
+ Name: obj.Name,
+ Namespace: obj.Namespace,
+ Labels: obj.Labels,
+ Annotations: obj.Annotations,
+ ResourceVersion: obj.ResourceVersion,
+ CreationTimestamp: obj.CreationTimestamp.Time,
+ OwnerReferences: obj.OwnerReferences,
+ UID: string(obj.UID),
+ Generation: obj.Generation,
+ },
+ Spec: &obj.Spec,
+ Status: &obj.Status,
+ }
+ },
+ gvk.Secret: func(r runtime.Object) config.Config {
+ obj := r.(*k8sioapicorev1.Secret)
+ return config.Config{
+ Meta: config.Meta{
+ GroupVersionKind: gvk.Secret,
+ Name: obj.Name,
+ Namespace: obj.Namespace,
+ Labels: obj.Labels,
+ Annotations: obj.Annotations,
+ ResourceVersion: obj.ResourceVersion,
+ CreationTimestamp: obj.CreationTimestamp.Time,
+ OwnerReferences: obj.OwnerReferences,
+ UID: string(obj.UID),
+ Generation: obj.Generation,
+ },
+ Spec: obj,
+ }
+ },
+ gvk.Service: func(r runtime.Object) config.Config {
+ obj := r.(*k8sioapicorev1.Service)
+ return config.Config{
+ Meta: config.Meta{
+ GroupVersionKind: gvk.Service,
+ Name: obj.Name,
+ Namespace: obj.Namespace,
+ Labels: obj.Labels,
+ Annotations: obj.Annotations,
+ ResourceVersion: obj.ResourceVersion,
+ CreationTimestamp: obj.CreationTimestamp.Time,
+ OwnerReferences: obj.OwnerReferences,
+ UID: string(obj.UID),
+ Generation: obj.Generation,
+ },
+ Spec: &obj.Spec,
+ Status: &obj.Status,
+ }
+ },
+ gvk.ServiceAccount: func(r runtime.Object) config.Config {
+ obj := r.(*k8sioapicorev1.ServiceAccount)
+ return config.Config{
+ Meta: config.Meta{
+ GroupVersionKind: gvk.ServiceAccount,
+ Name: obj.Name,
+ Namespace: obj.Namespace,
+ Labels: obj.Labels,
+ Annotations: obj.Annotations,
+ ResourceVersion: obj.ResourceVersion,
+ CreationTimestamp: obj.CreationTimestamp.Time,
+ OwnerReferences: obj.OwnerReferences,
+ UID: string(obj.UID),
+ Generation: obj.Generation,
+ },
+ Spec: obj,
+ }
+ },
+ gvk.StatefulSet: func(r runtime.Object) config.Config {
+ obj := r.(*k8sioapiappsv1.StatefulSet)
+ return config.Config{
+ Meta: config.Meta{
+ GroupVersionKind: gvk.StatefulSet,
+ Name: obj.Name,
+ Namespace: obj.Namespace,
+ Labels: obj.Labels,
+ Annotations: obj.Annotations,
+ ResourceVersion: obj.ResourceVersion,
+ CreationTimestamp: obj.CreationTimestamp.Time,
+ OwnerReferences: obj.OwnerReferences,
+ UID: string(obj.UID),
+ Generation: obj.Generation,
+ },
+ Spec: &obj.Spec,
+ }
+ },
+ gvk.ValidatingWebhookConfiguration: func(r runtime.Object)
config.Config {
+ obj :=
r.(*k8sioapiadmissionregistrationv1.ValidatingWebhookConfiguration)
+ return config.Config{
+ Meta: config.Meta{
+ GroupVersionKind:
gvk.ValidatingWebhookConfiguration,
+ Name: obj.Name,
+ Namespace: obj.Namespace,
+ Labels: obj.Labels,
+ Annotations: obj.Annotations,
+ ResourceVersion: obj.ResourceVersion,
+ CreationTimestamp: obj.CreationTimestamp.Time,
+ OwnerReferences: obj.OwnerReferences,
+ UID: string(obj.UID),
+ Generation: obj.Generation,
+ },
+ Spec: obj,
+ }
+ },
+ gvk.VirtualService: func(r runtime.Object) config.Config {
+ obj := r.(*apiistioioapinetworkingv1.VirtualService)
+ return config.Config{
+ Meta: config.Meta{
+ GroupVersionKind: gvk.VirtualService,
+ Name: obj.Name,
+ Namespace: obj.Namespace,
+ Labels: obj.Labels,
+ Annotations: obj.Annotations,
+ ResourceVersion: obj.ResourceVersion,
+ CreationTimestamp: obj.CreationTimestamp.Time,
+ OwnerReferences: obj.OwnerReferences,
+ UID: string(obj.UID),
+ Generation: obj.Generation,
+ },
+ Spec: &obj.Spec,
+ Status: &obj.Status,
+ }
+ },
+}
diff --git a/sail/pkg/config/kube/file/controller.go
b/sail/pkg/config/kube/file/controller.go
index 1a4ffa2a..0a1ec6c9 100644
--- a/sail/pkg/config/kube/file/controller.go
+++ b/sail/pkg/config/kube/file/controller.go
@@ -45,12 +45,7 @@ func (c ConfigKind) Equals(other ConfigKind) bool {
return c.Config.Equals(other.Config)
}
-func NewController(
- fileDir string,
- domainSuffix string,
- schemas collection.Schemas,
- options kubecontroller.Options,
-) (*Controller, error) {
+func NewController(fileDir string, domainSuffix string, schemas
collection.Schemas, options kubecontroller.Options) (*Controller, error) {
stop := make(chan struct{})
opts := krt.NewOptionsBuilder(stop, "file-monitor", options.KrtDebugger)
watch, err := krtfiles.NewFolderWatch(fileDir, func(b []byte)
([]*config.Config, error) {
diff --git a/sail/pkg/features/sail.go b/sail/pkg/features/sail.go
index f0a22a5c..7f4c15c0 100644
--- a/sail/pkg/features/sail.go
+++ b/sail/pkg/features/sail.go
@@ -23,20 +23,13 @@ import (
)
var (
+ ValidationWebhookConfigName =
env.Register("VALIDATION_WEBHOOK_CONFIG_NAME", "dubbo-dubbo-system",
+ "If not empty, the controller will automatically patch
validatingwebhookconfiguration when the CA certificate changes. "+
+ "Only works in kubernetes environment.").Get()
SharedMeshConfig = env.Register("SHARED_MESH_CONFIG", "",
"Additional config map to load for shared MeshConfig settings.
The standard mesh config will take precedence.").Get()
- EnableUnsafeAssertions = env.Register(
- "UNSAFE_NAVIGATOR_ENABLE_RUNTIME_ASSERTIONS",
- false,
- "If enabled, addition runtime asserts will be performed. "+
- "These checks are both expensive and panic on failure.
As a result, this should be used only for testing.",
- ).Get()
MultiRootMesh = env.Register("DUBBO_MULTIROOT_MESH", false,
"If enabled, mesh will support certificates signed by more than
one trustAnchor for DUBBO_MUTUAL mTLS").Get()
- NaviCertProvider = env.Register("PILOT_CERT_PROVIDER",
constants.CertProviderDubbod,
- "The provider of Navi DNS certificate. K8S RA will be used for
k8s.io/NAME. 'dubbod' value will sign"+
- " using Navi build in CA. Other values will not not
generate TLS certs, but still "+
- " distribute ./etc/certs/root-cert.pem. Only used if
custom certificates are not mounted.").Get()
InformerWatchNamespace = env.Register("DUBBO_WATCH_NAMESPACE", "",
"If set, limit Kubernetes watches to a single namespace. "+
"Warning: only a single namespace can be set.").Get()
@@ -50,7 +43,7 @@ var (
"If this is set to false, will not create CA server in
dubbod.").Get()
// EnableCACRL ToDo (nilekh): remove this feature flag once it's stable
EnableCACRL = env.Register(
- "SHIP_ENABLE_CA_CRL",
+ "SAIL_ENABLE_CA_CRL",
true, // Default value (true = feature enabled by default)
"If set to false, Dubbo will not watch for the ca-crl.pem file
in the /etc/cacerts directory "+
"and will not distribute CRL data to namespaces for
proxies to consume.",
@@ -62,6 +55,6 @@ var (
DubbodServiceCustomHost = env.Register("DUBBOD_CUSTOM_HOST", "",
"Custom host name of dubbod that dubbod signs the server cert.
"+
"Multiple custom host names are supported, and multiple
values are separated by commas.").Get()
- InjectionWebhookConfigName =
env.Register("INJECTION_WEBHOOK_CONFIG_NAME", "istio-sidecar-injector",
- "Name of the mutatingwebhookconfiguration to patch, if istioctl
is not used.").Get()
+ InjectionWebhookConfigName =
env.Register("INJECTION_WEBHOOK_CONFIG_NAME", "dubbo-proxyless-injector",
+ "Name of the mutatingwebhookconfiguration to patch, if dubboctl
is not used.").Get()
)
diff --git a/sail/pkg/model/cluster_local.go b/sail/pkg/model/cluster_local.go
index 9bb6b429..2db3003e 100644
--- a/sail/pkg/model/cluster_local.go
+++ b/sail/pkg/model/cluster_local.go
@@ -1,6 +1,16 @@
package model
-import "github.com/apache/dubbo-kubernetes/pkg/config/host"
+import (
+ "github.com/apache/dubbo-kubernetes/pkg/config/host"
+ "k8s.io/klog/v2"
+ "strings"
+ "sync"
+)
+
+var (
+ defaultClusterLocalNamespaces = []string{"kube-system"}
+ defaultClusterLocalServices = []string{"kubernetes.default.svc"}
+)
type ClusterLocalHosts struct {
specific map[host.Name]bool
@@ -8,7 +18,101 @@ type ClusterLocalHosts struct {
}
type ClusterLocalProvider interface {
- // GetClusterLocalHosts returns the list of cluster-local hosts, sorted
in
- // ascending order. The caller must not modify the returned list.
GetClusterLocalHosts() ClusterLocalHosts
}
+
+type clusterLocalProvider struct {
+ mutex sync.RWMutex
+ hosts ClusterLocalHosts
+}
+
+func NewClusterLocalProvider(e *Environment) ClusterLocalProvider {
+ c := &clusterLocalProvider{}
+
+ // Register a handler to update the environment when the mesh config is
updated.
+ e.AddMeshHandler(func() {
+ c.onMeshUpdated(e)
+ })
+
+ // Update the cluster-local hosts now.
+ c.onMeshUpdated(e)
+ return c
+}
+
+func (c *clusterLocalProvider) GetClusterLocalHosts() ClusterLocalHosts {
+ c.mutex.RLock()
+ out := c.hosts
+ c.mutex.RUnlock()
+ return out
+}
+
+func (c *clusterLocalProvider) onMeshUpdated(e *Environment) {
+ // Create the default list of cluster-local hosts.
+ domainSuffix := e.DomainSuffix
+ defaultClusterLocalHosts := make([]host.Name, 0)
+ for _, n := range defaultClusterLocalNamespaces {
+ defaultClusterLocalHosts = append(defaultClusterLocalHosts,
host.Name("*."+n+".svc."+domainSuffix))
+ }
+ for _, s := range defaultClusterLocalServices {
+ defaultClusterLocalHosts = append(defaultClusterLocalHosts,
host.Name(s+"."+domainSuffix))
+ }
+
+ if discoveryHost, _, err := e.GetDiscoveryAddress(); err != nil {
+ klog.Errorf("failed to make discoveryAddress cluster-local:
%v", err)
+ } else {
+ if !strings.HasSuffix(string(discoveryHost), domainSuffix) {
+ discoveryHost += host.Name("." + domainSuffix)
+ }
+ defaultClusterLocalHosts = append(defaultClusterLocalHosts,
discoveryHost)
+ }
+
+ // Collect the cluster-local hosts.
+ hosts := ClusterLocalHosts{
+ specific: make(map[host.Name]bool),
+ wildcard: make(map[host.Name]bool),
+ }
+
+ for _, serviceSettings := range e.Mesh().ServiceSettings {
+ isClusterLocal :=
serviceSettings.GetSettings().GetClusterLocal()
+ for _, h := range serviceSettings.GetHosts() {
+ // If clusterLocal false, check to see if we should
remove a default clusterLocal host.
+ if !isClusterLocal {
+ for i, defaultClusterLocalHost := range
defaultClusterLocalHosts {
+ if len(defaultClusterLocalHost) > 0 {
+ if h ==
string(defaultClusterLocalHost) ||
+
(defaultClusterLocalHost.IsWildCarded() &&
+
strings.HasSuffix(h, string(defaultClusterLocalHost[1:]))) {
+ // This default was
explicitly overridden, so remove it.
+
defaultClusterLocalHosts[i] = ""
+ }
+ }
+ }
+ }
+ }
+
+ // Add hosts with their clusterLocal setting to sets.
+ for _, h := range serviceSettings.GetHosts() {
+ hostname := host.Name(h)
+ if hostname.IsWildCarded() {
+ hosts.wildcard[hostname] = isClusterLocal
+ } else {
+ hosts.specific[hostname] = isClusterLocal
+ }
+ }
+ }
+
+ // Add any remaining defaults to the end of the list.
+ for _, defaultClusterLocalHost := range defaultClusterLocalHosts {
+ if len(defaultClusterLocalHost) > 0 {
+ if defaultClusterLocalHost.IsWildCarded() {
+ hosts.wildcard[defaultClusterLocalHost] = true
+ } else {
+ hosts.specific[defaultClusterLocalHost] = true
+ }
+ }
+ }
+
+ c.mutex.Lock()
+ c.hosts = hosts
+ c.mutex.Unlock()
+}
diff --git a/sail/pkg/model/config.go b/sail/pkg/model/config.go
index 9cd803ae..eead713c 100644
--- a/sail/pkg/model/config.go
+++ b/sail/pkg/model/config.go
@@ -4,9 +4,7 @@ import (
"cmp"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/schema/collection"
- "github.com/apache/dubbo-kubernetes/pkg/config/schema/kind"
"github.com/apache/dubbo-kubernetes/pkg/util/hash"
- "github.com/apache/dubbo-kubernetes/pkg/util/sets"
"sort"
)
@@ -53,15 +51,6 @@ func (key ConfigKey) String() string {
return key.Kind.String() + "/" + key.Namespace + "/" + key.Name
}
-func HasConfigsOfKind(configs sets.Set[ConfigKey], kind kind.Kind) bool {
- for c := range configs {
- if c.Kind == kind {
- return true
- }
- }
- return false
-}
-
func (key ConfigKey) HashCode() ConfigHash {
h := hash.New()
h.Write([]byte{byte(key.Kind)})
diff --git a/sail/pkg/model/context.go b/sail/pkg/model/context.go
index 35e39be1..4a1ad939 100644
--- a/sail/pkg/model/context.go
+++ b/sail/pkg/model/context.go
@@ -19,6 +19,7 @@ package model
import (
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/config/constants"
"github.com/apache/dubbo-kubernetes/pkg/config/host"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh"
"github.com/apache/dubbo-kubernetes/pkg/config/mesh/meshwatcher"
@@ -108,4 +109,18 @@ func (e *Environment) ClusterLocal() ClusterLocalProvider {
return e.clusterLocalServices
}
+func (e *Environment) Init() {
+ // Use a default DomainSuffix, if none was provided.
+ if len(e.DomainSuffix) == 0 {
+ e.DomainSuffix = constants.DefaultClusterLocalDomain
+ }
+
+ e.clusterLocalServices = NewClusterLocalProvider(e)
+}
+
+func (e *Environment) InitNetworksManager(updater XDSUpdater) (err error) {
+ e.NetworkManager, err = NewNetworkManager(e, updater)
+ return
+}
+
type Proxy struct{}
diff --git a/sail/pkg/model/network.go b/sail/pkg/model/network.go
index 66f7ce2c..744846a4 100644
--- a/sail/pkg/model/network.go
+++ b/sail/pkg/model/network.go
@@ -1,4 +1,14 @@
package model
type NetworkManager struct {
+ env *Environment
+ xdsUpdater XDSUpdater
+}
+
+func NewNetworkManager(env *Environment, xdsUpdater XDSUpdater)
(*NetworkManager, error) {
+ mgr := &NetworkManager{
+ env: env,
+ xdsUpdater: xdsUpdater,
+ }
+ return mgr, nil
}
diff --git a/security/pkg/credentialfetcher/fetcher.go
b/security/pkg/credentialfetcher/fetcher.go
deleted file mode 100644
index c95c6da5..00000000
--- a/security/pkg/credentialfetcher/fetcher.go
+++ /dev/null
@@ -1,21 +0,0 @@
-package credentialfetcher
-
-import (
- "fmt"
-
- "github.com/apache/dubbo-kubernetes/pkg/security"
-
"github.com/apache/dubbo-kubernetes/security/pkg/credentialfetcher/plugin"
-)
-
-func NewCredFetcher(credtype, trustdomain, jwtPath, identityProvider string)
(security.CredFetcher, error) {
- switch credtype {
- case security.JWT, "":
- // If unset, also default to JWT for backwards compatibility
- if jwtPath == "" {
- return nil, nil // no cred fetcher - using certificates
only
- }
- return plugin.CreateTokenPlugin(jwtPath), nil
- default:
- return nil, fmt.Errorf("invalid credential fetcher type %s",
credtype)
- }
-}
diff --git a/security/pkg/pki/ca/selfsignedcarootcertrotator.go
b/security/pkg/pki/ca/selfsignedcarootcertrotator.go
index 5b80849e..f124432f 100644
--- a/security/pkg/pki/ca/selfsignedcarootcertrotator.go
+++ b/security/pkg/pki/ca/selfsignedcarootcertrotator.go
@@ -86,7 +86,7 @@ func (rotator *SelfSignedCARootCertRotator) Run(stopCh chan
struct{}) {
select {
case <-ticker.C:
klog.Info("Check and rotate root cert.")
- // rotator.checkAndRotateRootCert()
+ // TODO rotator.checkAndRotateRootCert()
case _, ok := <-stopCh:
if !ok {
klog.Info("Received stop signal, so stop the
root cert rotator.")
diff --git a/security/tools/generate_cert/main.go
b/security/tools/generate_cert/main.go
index 4dd9f90b..61803d74 100644
--- a/security/tools/generate_cert/main.go
+++ b/security/tools/generate_cert/main.go
@@ -101,7 +101,7 @@ func saveCreds(certPem []byte, privPem []byte) {
}
func signCertFromCitadel() (*x509.Certificate, crypto.PrivateKey) {
- args := []string{"get", "secret", "-n", "istio-system",
"istio-ca-secret", "-o", "json"}
+ args := []string{"get", "secret", "-n", "istio-system", "Dubbo", "-o",
"json"}
cmd := exec.Command("kubectl", args...)
out, err := cmd.CombinedOutput()
if err != nil {