This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-operator.git
commit 65a35d4750ad0594b1aee5ed9f8080b59d491a91 Author: nobolity <[email protected]> AuthorDate: Sat May 28 22:35:03 2022 +0800 feat(operator): add alert --- PROJECT | 9 +++ api/v1alpha1/ds_public.go | 22 +++---- api/v1alpha1/zz_generated.deepcopy.go | 105 +++++++++++++++++++++++++++++++++ config/crd/kustomization.yaml | 3 + config/ds/alert/ds-alert-service.yaml | 3 +- config/rbac/role.yaml | 26 +++++++++ controllers/alert_reconcile.go | 107 ++++++++++++++++++++++++++++++++++ controllers/deployment.go | 47 +++++++++++++++ controllers/dsmaster_controller.go | 96 +++++++++++++++++++++++++++--- main.go | 7 +++ 10 files changed, 406 insertions(+), 19 deletions(-) diff --git a/PROJECT b/PROJECT index 6245a8a..55173fc 100644 --- a/PROJECT +++ b/PROJECT @@ -22,4 +22,13 @@ resources: kind: DSWorker path: dolphinscheduler-operator/api/v1alpha1 version: v1alpha1 +- api: + crdVersion: v1 + namespaced: true + controller: true + domain: apache.dolphinscheduler.dev + group: ds + kind: DSAlert + path: dolphinscheduler-operator/api/v1alpha1 + version: v1alpha1 version: "3" diff --git a/api/v1alpha1/ds_public.go b/api/v1alpha1/ds_public.go index c971727..bf82016 100644 --- a/api/v1alpha1/ds_public.go +++ b/api/v1alpha1/ds_public.go @@ -24,16 +24,18 @@ const ( ClusterUpgradeAnnotation = "github.com/nobolity/upgrade" ClusterBootStrappedAnnotation = "github.com/nobolity/bootstrapped" - DsAppName = "app" - DsVersionLabel = "ds-version" - FinalizerName = "github.com.nobolity.dolphinscheduler-operator" - EnvZookeeper = "REGISTRY_ZOOKEEPER_CONNECT_STRING" - DsServiceLabel = "service-name" - DsServiceLabelValue = "ds-service" - DataSourceDriveName = "SPRING_DATASOURCE_DRIVER_CLASS_NAME" - DataSourceUrl = "SPRING_DATASOURCE_URL" - DataSourceUserName = "SPRING_DATASOURCE_USERNAME" - DataSourcePassWord = "SPRING_DATASOURCE_PASSWORD" + DsAppName = "app" + DsVersionLabel = "ds-version" + FinalizerName = "github.com.nobolity.dolphinscheduler-operator" + EnvZookeeper = "REGISTRY_ZOOKEEPER_CONNECT_STRING" + DsServiceLabel = "service-name" + DsServiceLabelValue = "ds-service" + DsAlertServiceValue = "ds-alert-service" + DsAlertDeploymentValue = "ds-alert-deployment" + DataSourceDriveName = "SPRING_DATASOURCE_DRIVER_CLASS_NAME" + DataSourceUrl = "SPRING_DATASOURCE_URL" + DataSourceUserName = "SPRING_DATASOURCE_USERNAME" + DataSourcePassWord = "SPRING_DATASOURCE_PASSWORD" ) // DsCondition represents one current condition of a ds cluster. diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index bd6e45c..4592b8f 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -41,6 +41,111 @@ func (in *AlertConfig) DeepCopy() *AlertConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DSAlert) DeepCopyInto(out *DSAlert) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DSAlert. +func (in *DSAlert) DeepCopy() *DSAlert { + if in == nil { + return nil + } + out := new(DSAlert) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *DSAlert) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DSAlertList) DeepCopyInto(out *DSAlertList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]DSAlert, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DSAlertList. +func (in *DSAlertList) DeepCopy() *DSAlertList { + if in == nil { + return nil + } + out := new(DSAlertList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *DSAlertList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DSAlertSpec) DeepCopyInto(out *DSAlertSpec) { + *out = *in + if in.Datasource != nil { + in, out := &in.Datasource, &out.Datasource + *out = new(DateSourceTemplate) + **out = **in + } + if in.Pod != nil { + in, out := &in.Pod, &out.Pod + *out = new(PodPolicy) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DSAlertSpec. +func (in *DSAlertSpec) DeepCopy() *DSAlertSpec { + if in == nil { + return nil + } + out := new(DSAlertSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DSAlertStatus) DeepCopyInto(out *DSAlertStatus) { + *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]DsCondition, len(*in)) + copy(*out, *in) + } + in.Members.DeepCopyInto(&out.Members) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DSAlertStatus. +func (in *DSAlertStatus) DeepCopy() *DSAlertStatus { + if in == nil { + return nil + } + out := new(DSAlertStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DSMaster) DeepCopyInto(out *DSMaster) { *out = *in diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index fc07787..578c9e7 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -4,6 +4,7 @@ resources: - bases/ds.apache.dolphinscheduler.dev_dsmasters.yaml - bases/ds.apache.dolphinscheduler.dev_dsworkers.yaml +- bases/ds.apache.dolphinscheduler.dev_dsalerts.yaml #+kubebuilder:scaffold:crdkustomizeresource patchesStrategicMerge: @@ -11,12 +12,14 @@ patchesStrategicMerge: # patches here are for enabling the conversion webhook for each CRD #- patches/webhook_in_dsmasters.yaml #- patches/webhook_in_dsworkers.yaml +#- patches/webhook_in_dsalerts.yaml #+kubebuilder:scaffold:crdkustomizewebhookpatch # [CERTMANAGER] To enable cert-manager, uncomment all the sections with [CERTMANAGER] prefix. # patches here are for enabling the CA injection for each CRD #- patches/cainjection_in_dsmasters.yaml #- patches/cainjection_in_dsworkers.yaml +#- patches/cainjection_in_dsalerts.yaml #+kubebuilder:scaffold:crdkustomizecainjectionpatch # the following config is for teaching kustomize how to do kustomization for CRDs. diff --git a/config/ds/alert/ds-alert-service.yaml b/config/ds/alert/ds-alert-service.yaml index 8761644..21bc518 100644 --- a/config/ds/alert/ds-alert-service.yaml +++ b/config/ds/alert/ds-alert-service.yaml @@ -9,9 +9,8 @@ spec: - protocol: TCP port: 50052 targetPort: 50052 - nodePort: 30002 selector: - app: ds-api + app: ds-alert diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index edb21b7..9ef8743 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -5,6 +5,32 @@ metadata: creationTimestamp: null name: manager-role rules: +- apiGroups: + - ds.apache.dolphinscheduler.dev + resources: + - dsalerts + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - ds.apache.dolphinscheduler.dev + resources: + - dsalerts/finalizers + verbs: + - update +- apiGroups: + - ds.apache.dolphinscheduler.dev + resources: + - dsalerts/status + verbs: + - get + - patch + - update - apiGroups: - ds.apache.dolphinscheduler.dev resources: diff --git a/controllers/alert_reconcile.go b/controllers/alert_reconcile.go new file mode 100644 index 0000000..8326f23 --- /dev/null +++ b/controllers/alert_reconcile.go @@ -0,0 +1,107 @@ +/* +Copyright 2022 nobolity. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1" + v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +func createAlertService(cluster *dsv1alpha1.DSAlert) *corev1.Service { + service := corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ds-alert-service", + Namespace: cluster.Namespace, + Labels: map[string]string{dsv1alpha1.DsAppName: "ds-alert-service"}, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{dsv1alpha1.DsAppName: "ds-alert"}, + Ports: []corev1.ServicePort{ + { + Protocol: corev1.ProtocolTCP, + Port: *int32Ptr(int32(50052)), + TargetPort: intstr.IntOrString{ + IntVal: 50052, + }, + }, + }, + }, + } + return &service +} + +func createAlertDeployment(cluster *dsv1alpha1.DSAlert) *v1.Deployment { + alertDeployment := v1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ds-alert-deployment", + Namespace: "ds", + }, + Spec: v1.DeploymentSpec{ + Replicas: int32Ptr(int32(cluster.Spec.Replicas)), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "ds-alert", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "ds-alert", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "ds-alert", + Image: ImageName(cluster.Spec.Repository, cluster.Spec.Version), + ImagePullPolicy: corev1.PullIfNotPresent, + Env: []corev1.EnvVar{ + { + Name: dsv1alpha1.DataSourceDriveName, + Value: cluster.Spec.Datasource.DriveName, + }, + { + Name: dsv1alpha1.DataSourceUrl, + Value: cluster.Spec.Datasource.Url, + }, + { + Name: dsv1alpha1.DataSourceUserName, + Value: cluster.Spec.Datasource.UserName, + }, + { + Name: dsv1alpha1.DataSourcePassWord, + Value: cluster.Spec.Datasource.Password, + }, + }, + Ports: []corev1.ContainerPort{{ + ContainerPort: 50052, + }, + }, + }, + }, + }, + }, + }, + } + return &alertDeployment +} + +func int32Ptr(i int32) *int32 { + return &i +} diff --git a/controllers/deployment.go b/controllers/deployment.go new file mode 100644 index 0000000..3c0bc70 --- /dev/null +++ b/controllers/deployment.go @@ -0,0 +1,47 @@ +package controllers + +import ( + v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" +) + +// IsDeploymentAvailable returns true if a pod is ready; false otherwise. +func IsDeploymentAvailable(deployment *v1.Deployment) bool { + return IsDeploymentAvailableConditionTrue(deployment.Status) +} + +// IsDeploymentAvailableConditionTrue returns true if a deployment is available; false otherwise. +func IsDeploymentAvailableConditionTrue(status v1.DeploymentStatus) bool { + condition := GetDeploymentAvailableCondition(status) + return condition != nil && condition.Status == corev1.ConditionTrue +} + +// GetDeploymentReadyCondition extracts the deployment available condition from the given status and returns that. +// Returns nil if the condition is not present. +func GetDeploymentAvailableCondition(status v1.DeploymentStatus) *v1.DeploymentCondition { + _, condition := GetDeploymentCondition(&status, v1.DeploymentAvailable) + return condition +} + +// GetDeploymentCondition extracts the provided condition from the given status and returns that. +// Returns nil and -1 if the condition is not present, and the index of the located condition. +func GetDeploymentCondition(status *v1.DeploymentStatus, conditionType v1.DeploymentConditionType) (int, *v1.DeploymentCondition) { + if status == nil { + return -1, nil + } + return GetDeploymentConditionFromList(status.Conditions, conditionType) +} + +// GetDeploymentConditionFromList extracts the provided condition from the given list of condition and +// returns the index of the condition and the condition. Returns -1 and nil if the condition is not present. +func GetDeploymentConditionFromList(conditions []v1.DeploymentCondition, conditionType v1.DeploymentConditionType) (int, *v1.DeploymentCondition) { + if conditions == nil { + return -1, nil + } + for i := range conditions { + if conditions[i].Type == conditionType { + return i, &conditions[i] + } + } + return -1, nil +} diff --git a/controllers/dsmaster_controller.go b/controllers/dsmaster_controller.go index 9bb5c1a..559d61b 100644 --- a/controllers/dsmaster_controller.go +++ b/controllers/dsmaster_controller.go @@ -31,6 +31,10 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/source" + "sync" "time" dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1" @@ -38,7 +42,6 @@ import ( const ( dsMasterLabel = "ds-master" - dsMasterConfig = "ds-master-config" dsServiceLabel = "ds-operator-service" dsServiceName = "ds-operator-service" ) @@ -52,6 +55,8 @@ type DSMasterReconciler struct { client.Client Scheme *runtime.Scheme Recorder record.EventRecorder + clusters sync.Map + resyncCh chan event.GenericEvent } //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsmasters,verbs=get;list;watch;create;update;patch;delete @@ -84,9 +89,6 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c // Handler finalizer // examine DeletionTimestamp to determine if object is under deletion if cluster.ObjectMeta.DeletionTimestamp.IsZero() { - ms, _ := r.podMemberSet(ctx, cluster) - logger.Info("pods is", "pod", ms) - // The object is not being deleted, so if it does not have our finalizer, // then lets add the finalizer and update the object. This is equivalent // registering our finalizer. @@ -111,9 +113,6 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, err } } - - ms, _ := r.podMemberSet(ctx, cluster) - logger.Info("pods is", "pod", ms) // Stop reconciliation as the item is being deleted return ctrl.Result{}, nil } @@ -170,13 +169,26 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c logger.Info("******************************************************") desired.Status.Phase = dsv1alpha1.DsPhaseNone + if err := r.Update(ctx, desired); err != nil { + return ctrl.Result{}, err + } return ctrl.Result{Requeue: false}, nil } // SetupWithManager sets up the controller with the Manager. func (r *DSMasterReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.clusters = sync.Map{} + r.resyncCh = make(chan event.GenericEvent) + r.Recorder = mgr.GetEventRecorderFor("master-controller") + + filter := &Predicate{} return ctrl.NewControllerManagedBy(mgr). For(&dsv1alpha1.DSMaster{}). + Owns(&corev1.Pod{}). + Owns(&corev1.Service{}). + Watches(&source.Channel{Source: r.resyncCh}, &handler.EnqueueRequestForObject{}). + // or use WithEventFilter() + WithEventFilter(filter). Complete(r) } @@ -318,3 +330,73 @@ func (r *DSMasterReconciler) ensureMasterService(ctx context.Context, cluster *d } return nil } + +type Predicate struct{} + +// Create will be trigger when object created or controller restart +// first time see the object +func (r *Predicate) Create(evt event.CreateEvent) bool { + switch evt.Object.(type) { + case *dsv1alpha1.DSMaster: + return true + } + return false +} + +func (r *Predicate) Update(evt event.UpdateEvent) bool { + switch evt.ObjectNew.(type) { + case *dsv1alpha1.DSMaster: + oldC := evt.ObjectOld.(*dsv1alpha1.DSMaster) + newC := evt.ObjectNew.(*dsv1alpha1.DSMaster) + + logger.V(5).Info("Running update filter", + "Old size", oldC.Spec.Replicas, + "New size", newC.Spec.Replicas, + "old paused", oldC.Spec.Paused, + "new paused", newC.Spec.Paused, + "old object deletion", !oldC.ObjectMeta.DeletionTimestamp.IsZero(), + "new object deletion", !newC.ObjectMeta.DeletionTimestamp.IsZero()) + + // Only care about size, version and paused fields + if oldC.Spec.Replicas != newC.Spec.Replicas { + return true + } + + if oldC.Spec.Paused != newC.Spec.Paused { + return true + } + + if oldC.Spec.Version != newC.Spec.Version { + return true + } + + // If cluster has been marked as deleted, check if we have remove our finalizer + // If it has our finalizer, indicating our cleaning up works has not been done. + if oldC.DeletionTimestamp.IsZero() && !newC.DeletionTimestamp.IsZero() { + if controllerutil.ContainsFinalizer(newC, dsv1alpha1.FinalizerName) { + return true + } + } + } + return false +} + +func (r *Predicate) Delete(evt event.DeleteEvent) bool { + switch evt.Object.(type) { + case *dsv1alpha1.DSMaster: + return true + case *corev1.Pod: + return true + case *corev1.Service: + return true + } + return false +} + +func (r *Predicate) Generic(evt event.GenericEvent) bool { + switch evt.Object.(type) { + case *dsv1alpha1.DSMaster: + return true + } + return false +} diff --git a/main.go b/main.go index 832088e..9f12997 100644 --- a/main.go +++ b/main.go @@ -92,6 +92,13 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "DSWorker") os.Exit(1) } + if err = (&controllers.DSAlertReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "DSAlert") + os.Exit(1) + } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
