This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-operator.git

commit 65a35d4750ad0594b1aee5ed9f8080b59d491a91
Author: nobolity <[email protected]>
AuthorDate: Sat May 28 22:35:03 2022 +0800

    feat(operator): add alert
---
 PROJECT                               |   9 +++
 api/v1alpha1/ds_public.go             |  22 +++----
 api/v1alpha1/zz_generated.deepcopy.go | 105 +++++++++++++++++++++++++++++++++
 config/crd/kustomization.yaml         |   3 +
 config/ds/alert/ds-alert-service.yaml |   3 +-
 config/rbac/role.yaml                 |  26 +++++++++
 controllers/alert_reconcile.go        | 107 ++++++++++++++++++++++++++++++++++
 controllers/deployment.go             |  47 +++++++++++++++
 controllers/dsmaster_controller.go    |  96 +++++++++++++++++++++++++++---
 main.go                               |   7 +++
 10 files changed, 406 insertions(+), 19 deletions(-)

diff --git a/PROJECT b/PROJECT
index 6245a8a..55173fc 100644
--- a/PROJECT
+++ b/PROJECT
@@ -22,4 +22,13 @@ resources:
   kind: DSWorker
   path: dolphinscheduler-operator/api/v1alpha1
   version: v1alpha1
+- api:
+    crdVersion: v1
+    namespaced: true
+  controller: true
+  domain: apache.dolphinscheduler.dev
+  group: ds
+  kind: DSAlert
+  path: dolphinscheduler-operator/api/v1alpha1
+  version: v1alpha1
 version: "3"
diff --git a/api/v1alpha1/ds_public.go b/api/v1alpha1/ds_public.go
index c971727..bf82016 100644
--- a/api/v1alpha1/ds_public.go
+++ b/api/v1alpha1/ds_public.go
@@ -24,16 +24,18 @@ const (
        ClusterUpgradeAnnotation      = "github.com/nobolity/upgrade"
        ClusterBootStrappedAnnotation = "github.com/nobolity/bootstrapped"
 
-       DsAppName           = "app"
-       DsVersionLabel      = "ds-version"
-       FinalizerName       = "github.com.nobolity.dolphinscheduler-operator"
-       EnvZookeeper        = "REGISTRY_ZOOKEEPER_CONNECT_STRING"
-       DsServiceLabel      = "service-name"
-       DsServiceLabelValue = "ds-service"
-       DataSourceDriveName = "SPRING_DATASOURCE_DRIVER_CLASS_NAME"
-       DataSourceUrl       = "SPRING_DATASOURCE_URL"
-       DataSourceUserName  = "SPRING_DATASOURCE_USERNAME"
-       DataSourcePassWord  = "SPRING_DATASOURCE_PASSWORD"
+       DsAppName              = "app"
+       DsVersionLabel         = "ds-version"
+       FinalizerName          = "github.com.nobolity.dolphinscheduler-operator"
+       EnvZookeeper           = "REGISTRY_ZOOKEEPER_CONNECT_STRING"
+       DsServiceLabel         = "service-name"
+       DsServiceLabelValue    = "ds-service"
+       DsAlertServiceValue    = "ds-alert-service"
+       DsAlertDeploymentValue = "ds-alert-deployment"
+       DataSourceDriveName    = "SPRING_DATASOURCE_DRIVER_CLASS_NAME"
+       DataSourceUrl          = "SPRING_DATASOURCE_URL"
+       DataSourceUserName     = "SPRING_DATASOURCE_USERNAME"
+       DataSourcePassWord     = "SPRING_DATASOURCE_PASSWORD"
 )
 
 // DsCondition represents one current condition of a ds cluster.
diff --git a/api/v1alpha1/zz_generated.deepcopy.go 
b/api/v1alpha1/zz_generated.deepcopy.go
index bd6e45c..4592b8f 100644
--- a/api/v1alpha1/zz_generated.deepcopy.go
+++ b/api/v1alpha1/zz_generated.deepcopy.go
@@ -41,6 +41,111 @@ func (in *AlertConfig) DeepCopy() *AlertConfig {
        return out
 }
 
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
+func (in *DSAlert) DeepCopyInto(out *DSAlert) {
+       *out = *in
+       out.TypeMeta = in.TypeMeta
+       in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
+       in.Spec.DeepCopyInto(&out.Spec)
+       in.Status.DeepCopyInto(&out.Status)
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new DSAlert.
+func (in *DSAlert) DeepCopy() *DSAlert {
+       if in == nil {
+               return nil
+       }
+       out := new(DSAlert)
+       in.DeepCopyInto(out)
+       return out
+}
+
+// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, 
creating a new runtime.Object.
+func (in *DSAlert) DeepCopyObject() runtime.Object {
+       if c := in.DeepCopy(); c != nil {
+               return c
+       }
+       return nil
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
+func (in *DSAlertList) DeepCopyInto(out *DSAlertList) {
+       *out = *in
+       out.TypeMeta = in.TypeMeta
+       in.ListMeta.DeepCopyInto(&out.ListMeta)
+       if in.Items != nil {
+               in, out := &in.Items, &out.Items
+               *out = make([]DSAlert, len(*in))
+               for i := range *in {
+                       (*in)[i].DeepCopyInto(&(*out)[i])
+               }
+       }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new DSAlertList.
+func (in *DSAlertList) DeepCopy() *DSAlertList {
+       if in == nil {
+               return nil
+       }
+       out := new(DSAlertList)
+       in.DeepCopyInto(out)
+       return out
+}
+
+// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, 
creating a new runtime.Object.
+func (in *DSAlertList) DeepCopyObject() runtime.Object {
+       if c := in.DeepCopy(); c != nil {
+               return c
+       }
+       return nil
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
+func (in *DSAlertSpec) DeepCopyInto(out *DSAlertSpec) {
+       *out = *in
+       if in.Datasource != nil {
+               in, out := &in.Datasource, &out.Datasource
+               *out = new(DateSourceTemplate)
+               **out = **in
+       }
+       if in.Pod != nil {
+               in, out := &in.Pod, &out.Pod
+               *out = new(PodPolicy)
+               (*in).DeepCopyInto(*out)
+       }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new DSAlertSpec.
+func (in *DSAlertSpec) DeepCopy() *DSAlertSpec {
+       if in == nil {
+               return nil
+       }
+       out := new(DSAlertSpec)
+       in.DeepCopyInto(out)
+       return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
+func (in *DSAlertStatus) DeepCopyInto(out *DSAlertStatus) {
+       *out = *in
+       if in.Conditions != nil {
+               in, out := &in.Conditions, &out.Conditions
+               *out = make([]DsCondition, len(*in))
+               copy(*out, *in)
+       }
+       in.Members.DeepCopyInto(&out.Members)
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new DSAlertStatus.
+func (in *DSAlertStatus) DeepCopy() *DSAlertStatus {
+       if in == nil {
+               return nil
+       }
+       out := new(DSAlertStatus)
+       in.DeepCopyInto(out)
+       return out
+}
+
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
 func (in *DSMaster) DeepCopyInto(out *DSMaster) {
        *out = *in
diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml
index fc07787..578c9e7 100644
--- a/config/crd/kustomization.yaml
+++ b/config/crd/kustomization.yaml
@@ -4,6 +4,7 @@
 resources:
 - bases/ds.apache.dolphinscheduler.dev_dsmasters.yaml
 - bases/ds.apache.dolphinscheduler.dev_dsworkers.yaml
+- bases/ds.apache.dolphinscheduler.dev_dsalerts.yaml
 #+kubebuilder:scaffold:crdkustomizeresource
 
 patchesStrategicMerge:
@@ -11,12 +12,14 @@ patchesStrategicMerge:
 # patches here are for enabling the conversion webhook for each CRD
 #- patches/webhook_in_dsmasters.yaml
 #- patches/webhook_in_dsworkers.yaml
+#- patches/webhook_in_dsalerts.yaml
 #+kubebuilder:scaffold:crdkustomizewebhookpatch
 
 # [CERTMANAGER] To enable cert-manager, uncomment all the sections with 
[CERTMANAGER] prefix.
 # patches here are for enabling the CA injection for each CRD
 #- patches/cainjection_in_dsmasters.yaml
 #- patches/cainjection_in_dsworkers.yaml
+#- patches/cainjection_in_dsalerts.yaml
 #+kubebuilder:scaffold:crdkustomizecainjectionpatch
 
 # the following config is for teaching kustomize how to do kustomization for 
CRDs.
diff --git a/config/ds/alert/ds-alert-service.yaml 
b/config/ds/alert/ds-alert-service.yaml
index 8761644..21bc518 100644
--- a/config/ds/alert/ds-alert-service.yaml
+++ b/config/ds/alert/ds-alert-service.yaml
@@ -9,9 +9,8 @@ spec:
     - protocol: TCP
       port: 50052
       targetPort: 50052
-      nodePort: 30002
   selector:
-    app: ds-api
+    app: ds-alert
 
 
 
diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index edb21b7..9ef8743 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -5,6 +5,32 @@ metadata:
   creationTimestamp: null
   name: manager-role
 rules:
+- apiGroups:
+  - ds.apache.dolphinscheduler.dev
+  resources:
+  - dsalerts
+  verbs:
+  - create
+  - delete
+  - get
+  - list
+  - patch
+  - update
+  - watch
+- apiGroups:
+  - ds.apache.dolphinscheduler.dev
+  resources:
+  - dsalerts/finalizers
+  verbs:
+  - update
+- apiGroups:
+  - ds.apache.dolphinscheduler.dev
+  resources:
+  - dsalerts/status
+  verbs:
+  - get
+  - patch
+  - update
 - apiGroups:
   - ds.apache.dolphinscheduler.dev
   resources:
diff --git a/controllers/alert_reconcile.go b/controllers/alert_reconcile.go
new file mode 100644
index 0000000..8326f23
--- /dev/null
+++ b/controllers/alert_reconcile.go
@@ -0,0 +1,107 @@
+/*
+Copyright 2022 nobolity.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controllers
+
+import (
+       dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
+       v1 "k8s.io/api/apps/v1"
+       corev1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/util/intstr"
+)
+
+func createAlertService(cluster *dsv1alpha1.DSAlert) *corev1.Service {
+       service := corev1.Service{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      "ds-alert-service",
+                       Namespace: cluster.Namespace,
+                       Labels:    map[string]string{dsv1alpha1.DsAppName: 
"ds-alert-service"},
+               },
+               Spec: corev1.ServiceSpec{
+                       Selector: map[string]string{dsv1alpha1.DsAppName: 
"ds-alert"},
+                       Ports: []corev1.ServicePort{
+                               {
+                                       Protocol: corev1.ProtocolTCP,
+                                       Port:     *int32Ptr(int32(50052)),
+                                       TargetPort: intstr.IntOrString{
+                                               IntVal: 50052,
+                                       },
+                               },
+                       },
+               },
+       }
+       return &service
+}
+
+func createAlertDeployment(cluster *dsv1alpha1.DSAlert) *v1.Deployment {
+       alertDeployment := v1.Deployment{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      "ds-alert-deployment",
+                       Namespace: "ds",
+               },
+               Spec: v1.DeploymentSpec{
+                       Replicas: int32Ptr(int32(cluster.Spec.Replicas)),
+                       Selector: &metav1.LabelSelector{
+                               MatchLabels: map[string]string{
+                                       "app": "ds-alert",
+                               },
+                       },
+                       Template: corev1.PodTemplateSpec{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Labels: map[string]string{
+                                               "app": "ds-alert",
+                                       },
+                               },
+                               Spec: corev1.PodSpec{
+                                       Containers: []corev1.Container{{
+                                               Name:            "ds-alert",
+                                               Image:           
ImageName(cluster.Spec.Repository, cluster.Spec.Version),
+                                               ImagePullPolicy: 
corev1.PullIfNotPresent,
+                                               Env: []corev1.EnvVar{
+                                                       {
+                                                               Name:  
dsv1alpha1.DataSourceDriveName,
+                                                               Value: 
cluster.Spec.Datasource.DriveName,
+                                                       },
+                                                       {
+                                                               Name:  
dsv1alpha1.DataSourceUrl,
+                                                               Value: 
cluster.Spec.Datasource.Url,
+                                                       },
+                                                       {
+                                                               Name:  
dsv1alpha1.DataSourceUserName,
+                                                               Value: 
cluster.Spec.Datasource.UserName,
+                                                       },
+                                                       {
+                                                               Name:  
dsv1alpha1.DataSourcePassWord,
+                                                               Value: 
cluster.Spec.Datasource.Password,
+                                                       },
+                                               },
+                                               Ports: []corev1.ContainerPort{{
+                                                       ContainerPort: 50052,
+                                               },
+                                               },
+                                       },
+                                       },
+                               },
+                       },
+               },
+       }
+       return &alertDeployment
+}
+
+func int32Ptr(i int32) *int32 {
+       return &i
+}
diff --git a/controllers/deployment.go b/controllers/deployment.go
new file mode 100644
index 0000000..3c0bc70
--- /dev/null
+++ b/controllers/deployment.go
@@ -0,0 +1,47 @@
+package controllers
+
+import (
+       v1 "k8s.io/api/apps/v1"
+       corev1 "k8s.io/api/core/v1"
+)
+
+// IsDeploymentAvailable returns true if a pod is ready; false otherwise.
+func IsDeploymentAvailable(deployment *v1.Deployment) bool {
+       return IsDeploymentAvailableConditionTrue(deployment.Status)
+}
+
+// IsDeploymentAvailableConditionTrue returns true if a deployment is 
available; false otherwise.
+func IsDeploymentAvailableConditionTrue(status v1.DeploymentStatus) bool {
+       condition := GetDeploymentAvailableCondition(status)
+       return condition != nil && condition.Status == corev1.ConditionTrue
+}
+
+// GetDeploymentReadyCondition extracts the deployment available condition 
from the given status and returns that.
+// Returns nil if the condition is not present.
+func GetDeploymentAvailableCondition(status v1.DeploymentStatus) 
*v1.DeploymentCondition {
+       _, condition := GetDeploymentCondition(&status, v1.DeploymentAvailable)
+       return condition
+}
+
+// GetDeploymentCondition extracts the provided condition from the given 
status and returns that.
+// Returns nil and -1 if the condition is not present, and the index of the 
located condition.
+func GetDeploymentCondition(status *v1.DeploymentStatus, conditionType 
v1.DeploymentConditionType) (int, *v1.DeploymentCondition) {
+       if status == nil {
+               return -1, nil
+       }
+       return GetDeploymentConditionFromList(status.Conditions, conditionType)
+}
+
+// GetDeploymentConditionFromList extracts the provided condition from the 
given list of condition and
+// returns the index of the condition and the condition. Returns -1 and nil if 
the condition is not present.
+func GetDeploymentConditionFromList(conditions []v1.DeploymentCondition, 
conditionType v1.DeploymentConditionType) (int, *v1.DeploymentCondition) {
+       if conditions == nil {
+               return -1, nil
+       }
+       for i := range conditions {
+               if conditions[i].Type == conditionType {
+                       return i, &conditions[i]
+               }
+       }
+       return -1, nil
+}
diff --git a/controllers/dsmaster_controller.go 
b/controllers/dsmaster_controller.go
index 9bb5c1a..559d61b 100644
--- a/controllers/dsmaster_controller.go
+++ b/controllers/dsmaster_controller.go
@@ -31,6 +31,10 @@ import (
        ctrl "sigs.k8s.io/controller-runtime"
        "sigs.k8s.io/controller-runtime/pkg/client"
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+       "sigs.k8s.io/controller-runtime/pkg/event"
+       "sigs.k8s.io/controller-runtime/pkg/handler"
+       "sigs.k8s.io/controller-runtime/pkg/source"
+       "sync"
        "time"
 
        dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
@@ -38,7 +42,6 @@ import (
 
 const (
        dsMasterLabel  = "ds-master"
-       dsMasterConfig = "ds-master-config"
        dsServiceLabel = "ds-operator-service"
        dsServiceName  = "ds-operator-service"
 )
@@ -52,6 +55,8 @@ type DSMasterReconciler struct {
        client.Client
        Scheme   *runtime.Scheme
        Recorder record.EventRecorder
+       clusters sync.Map
+       resyncCh chan event.GenericEvent
 }
 
 
//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsmasters,verbs=get;list;watch;create;update;patch;delete
@@ -84,9 +89,6 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, 
req ctrl.Request) (c
        // Handler finalizer
        // examine DeletionTimestamp to determine if object is under deletion
        if cluster.ObjectMeta.DeletionTimestamp.IsZero() {
-               ms, _ := r.podMemberSet(ctx, cluster)
-               logger.Info("pods is", "pod", ms)
-
                // The object is not being deleted, so if it does not have our 
finalizer,
                // then lets add the finalizer and update the object. This is 
equivalent
                // registering our finalizer.
@@ -111,9 +113,6 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, 
req ctrl.Request) (c
                                return ctrl.Result{}, err
                        }
                }
-
-               ms, _ := r.podMemberSet(ctx, cluster)
-               logger.Info("pods is", "pod", ms)
                // Stop reconciliation as the item is being deleted
                return ctrl.Result{}, nil
        }
@@ -170,13 +169,26 @@ func (r *DSMasterReconciler) Reconcile(ctx 
context.Context, req ctrl.Request) (c
 
        logger.Info("******************************************************")
        desired.Status.Phase = dsv1alpha1.DsPhaseNone
+       if err := r.Update(ctx, desired); err != nil {
+               return ctrl.Result{}, err
+       }
        return ctrl.Result{Requeue: false}, nil
 }
 
 // SetupWithManager sets up the controller with the Manager.
 func (r *DSMasterReconciler) SetupWithManager(mgr ctrl.Manager) error {
+       r.clusters = sync.Map{}
+       r.resyncCh = make(chan event.GenericEvent)
+       r.Recorder = mgr.GetEventRecorderFor("master-controller")
+
+       filter := &Predicate{}
        return ctrl.NewControllerManagedBy(mgr).
                For(&dsv1alpha1.DSMaster{}).
+               Owns(&corev1.Pod{}).
+               Owns(&corev1.Service{}).
+               Watches(&source.Channel{Source: r.resyncCh}, 
&handler.EnqueueRequestForObject{}).
+               // or use WithEventFilter()
+               WithEventFilter(filter).
                Complete(r)
 }
 
@@ -318,3 +330,73 @@ func (r *DSMasterReconciler) ensureMasterService(ctx 
context.Context, cluster *d
        }
        return nil
 }
+
+type Predicate struct{}
+
+// Create will be trigger when object created or controller restart
+// first time see the object
+func (r *Predicate) Create(evt event.CreateEvent) bool {
+       switch evt.Object.(type) {
+       case *dsv1alpha1.DSMaster:
+               return true
+       }
+       return false
+}
+
+func (r *Predicate) Update(evt event.UpdateEvent) bool {
+       switch evt.ObjectNew.(type) {
+       case *dsv1alpha1.DSMaster:
+               oldC := evt.ObjectOld.(*dsv1alpha1.DSMaster)
+               newC := evt.ObjectNew.(*dsv1alpha1.DSMaster)
+
+               logger.V(5).Info("Running update filter",
+                       "Old size", oldC.Spec.Replicas,
+                       "New size", newC.Spec.Replicas,
+                       "old paused", oldC.Spec.Paused,
+                       "new paused", newC.Spec.Paused,
+                       "old object deletion", 
!oldC.ObjectMeta.DeletionTimestamp.IsZero(),
+                       "new object deletion", 
!newC.ObjectMeta.DeletionTimestamp.IsZero())
+
+               // Only care about size, version and paused fields
+               if oldC.Spec.Replicas != newC.Spec.Replicas {
+                       return true
+               }
+
+               if oldC.Spec.Paused != newC.Spec.Paused {
+                       return true
+               }
+
+               if oldC.Spec.Version != newC.Spec.Version {
+                       return true
+               }
+
+               // If cluster has been marked as deleted, check if we have 
remove our finalizer
+               // If it has our finalizer, indicating our cleaning up works 
has not been done.
+               if oldC.DeletionTimestamp.IsZero() && 
!newC.DeletionTimestamp.IsZero() {
+                       if controllerutil.ContainsFinalizer(newC, 
dsv1alpha1.FinalizerName) {
+                               return true
+                       }
+               }
+       }
+       return false
+}
+
+func (r *Predicate) Delete(evt event.DeleteEvent) bool {
+       switch evt.Object.(type) {
+       case *dsv1alpha1.DSMaster:
+               return true
+       case *corev1.Pod:
+               return true
+       case *corev1.Service:
+               return true
+       }
+       return false
+}
+
+func (r *Predicate) Generic(evt event.GenericEvent) bool {
+       switch evt.Object.(type) {
+       case *dsv1alpha1.DSMaster:
+               return true
+       }
+       return false
+}
diff --git a/main.go b/main.go
index 832088e..9f12997 100644
--- a/main.go
+++ b/main.go
@@ -92,6 +92,13 @@ func main() {
                setupLog.Error(err, "unable to create controller", 
"controller", "DSWorker")
                os.Exit(1)
        }
+       if err = (&controllers.DSAlertReconciler{
+               Client: mgr.GetClient(),
+               Scheme: mgr.GetScheme(),
+       }).SetupWithManager(mgr); err != nil {
+               setupLog.Error(err, "unable to create controller", 
"controller", "DSAlert")
+               os.Exit(1)
+       }
        //+kubebuilder:scaffold:builder
 
        if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {

Reply via email to