This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-operator.git

commit 452d22b5541cca936ad4ca9acef4c300930181b1
Author: nobolity <[email protected]>
AuthorDate: Mon Jun 6 16:34:59 2022 +0800

    feat(master && worker): add feature hpa
---
 api/v1alpha1/ds_public.go                          |  22 +++-
 api/v1alpha1/dsmaster_types.go                     |   3 +-
 api/v1alpha1/dsworker_types.go                     |  19 ++--
 api/v1alpha1/zz_generated.deepcopy.go              |  40 ++++----
 .../ds.apache.dolphinscheduler.dev_dsmasters.yaml  |  15 +++
 .../ds.apache.dolphinscheduler.dev_dsworkers.yaml  |  33 ++++--
 config/ds/postgreSQL/postgres-service.yaml         |   4 +-
 config/samples/ds_v1alpha1_dsalert.yaml            |   2 +-
 config/samples/ds_v1alpha1_dsapi.yaml              |   4 +-
 config/samples/ds_v1alpha1_dsmaster.yaml           |  10 +-
 config/samples/ds_v1alpha1_dsworker.yaml           |  16 ++-
 controllers/dsalert_controller.go                  |   2 +-
 controllers/dsapi_controller.go                    |   1 -
 controllers/dsmaster_controller.go                 |  86 +++++++++++-----
 controllers/dsworker_controller.go                 | 114 ++++++++++++---------
 controllers/master_reconcile.go                    | 111 +++++++++++---------
 controllers/pod.go                                 |  27 ++---
 controllers/worker_reconcile.go                    |  53 +---------
 18 files changed, 315 insertions(+), 247 deletions(-)

diff --git a/api/v1alpha1/ds_public.go b/api/v1alpha1/ds_public.go
index efdb8b5..260097c 100644
--- a/api/v1alpha1/ds_public.go
+++ b/api/v1alpha1/ds_public.go
@@ -20,16 +20,22 @@ const (
        DsConditionScaling                   = "Scaling"
        DsConditionUpgrading                 = "Upgrading"
 
-       ClusterMembersAnnotation      = "github.com/nobolity/members"
-       ClusterUpgradeAnnotation      = "github.com/nobolity/upgrade"
-       ClusterBootStrappedAnnotation = "github.com/nobolity/bootstrapped"
-
        DsAppName              = "app"
+       APIVersion             = "ds.apache.dolphinscheduler.dev/v1alpha1"
        DsVersionLabel         = "ds-version"
+       DsLogVolumeName        = "ds-log"
+       DsLogVolumeMountDir    = "/opt/dolphinscheduler/logs"
+       DsShareVolumeName      = "ds-soft"
+       DsShareVolumeMountDir  = "/opt/soft"
+       DSVersion              = "v1alpha1"
        FinalizerName          = "github.com.nobolity.dolphinscheduler-operator"
        EnvZookeeper           = "REGISTRY_ZOOKEEPER_CONNECT_STRING"
        DsServiceLabel         = "service-name"
        DsServiceLabelValue    = "ds-service"
+       DsMasterLabel          = "ds-master"
+       DsHeadLessServiceLabel = "ds-operator-service"
+       DsWorkerLabel          = "ds-worker"
+       DsWorkerKind           = "DSWorker"
        DsAlert                = "ds-alert"
        DsAlertServiceValue    = "ds-alert-service"
        DsAlertDeploymentValue = "ds-alert-deployment"
@@ -42,6 +48,7 @@ const (
        DataSourcePassWord     = "SPRING_DATASOURCE_PASSWORD"
        DsApiPort              = 12345
        DsAlertPort            = 50052
+       DsWorkerHpa            = "ds-worker-hpa"
 )
 
 // DsCondition represents one current condition of a ds cluster.
@@ -109,6 +116,13 @@ type DeploymentPolicy struct {
        Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
 }
 
+type HpaPolicy struct {
+       MinReplicas           int32 `json:"min_replicas,omitempty"`
+       MaxReplicas           int32 `json:"max_replicas,omitempty"`
+       CPUAverageUtilization int32 `json:"cpu_average_utilization,omitempty"`
+       MEMAverageUtilization int32 `json:"mem_average_utilization,omitempty"`
+}
+
 type MembersStatus struct {
        // Ready are the dsMaster members that are ready to serve requests
        // The member names are the same as the dsMaster pod names
diff --git a/api/v1alpha1/dsmaster_types.go b/api/v1alpha1/dsmaster_types.go
index 581835c..dfb743d 100644
--- a/api/v1alpha1/dsmaster_types.go
+++ b/api/v1alpha1/dsmaster_types.go
@@ -57,6 +57,8 @@ type DSMasterSpec struct {
        // Updating Pod does not take effect on any existing dm-master pods.
        Pod *PodPolicy `json:"pod,omitempty"`
 
+       HpaPolicy *HpaPolicy `json:"hpa,omitempty"`
+
        // Paused is to pause the control of the operator for the ds-master .
        // +kubebuilder:default=false
        Paused bool `json:"paused,omitempty"`
@@ -89,7 +91,6 @@ type DSMasterStatus struct {
 
 //+kubebuilder:object:root=true
 //+kubebuilder:subresource:status
-
 // DSMaster is the Schema for the dsmasters API
 type DSMaster struct {
        metav1.TypeMeta   `json:",inline"`
diff --git a/api/v1alpha1/dsworker_types.go b/api/v1alpha1/dsworker_types.go
index 19f90b3..f142a41 100644
--- a/api/v1alpha1/dsworker_types.go
+++ b/api/v1alpha1/dsworker_types.go
@@ -68,9 +68,6 @@ type DSWorkerSpec struct {
 
        //LibPvcName define the address of lib pvc,the position is /opt/soft
        LibPvcName string `json:"lib_pvc_name,omitempty"`
-
-       //AlertConfig is the config of alertService
-       AlertConfig *AlertConfig `json:"alert_config,omitempty"`
 }
 
 // DSWorkerStatus defines the observed state of DSWorker
@@ -93,12 +90,18 @@ type DSWorkerStatus struct {
 
        // Members are the dsWorker members in the cluster
        Members MembersStatus `json:"members,omitempty"`
+
+       // Selector must be the string form of the selector
+       Selector string `json:"selector"`
 }
 
 //+kubebuilder:object:root=true
+//+kubebuilder:printcolumn:name="replicas",type="integer",JSONPath=".spec.replicas"
+//+kubebuilder:printcolumn:name="repository",type="string",JSONPath=".spec.repository"
+//+kubebuilder:printcolumn:name="version",type="string",JSONPath=".spec.version"
+//+kubebuilder:printcolumn:name="controlPaused",type="boolean",JSONPath=".spec.controlPaused"
 //+kubebuilder:subresource:status
-
-// DSWorker is the Schema for the dsworkers API
+//+kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas,selectorpath=.status.selector
 type DSWorker struct {
        metav1.TypeMeta   `json:",inline"`
        metav1.ObjectMeta `json:"metadata,omitempty"`
@@ -108,7 +111,6 @@ type DSWorker struct {
 }
 
 //+kubebuilder:object:root=true
-
 // DSWorkerList contains a list of DSWorker
 type DSWorkerList struct {
        metav1.TypeMeta `json:",inline"`
@@ -119,8 +121,3 @@ type DSWorkerList struct {
 func init() {
        SchemeBuilder.Register(&DSWorker{}, &DSWorkerList{})
 }
-
-type AlertConfig struct {
-       ServiceUrl string `json:"service_url,omitempty"`
-       Port       string `json:"port,omitempty"`
-}
diff --git a/api/v1alpha1/zz_generated.deepcopy.go 
b/api/v1alpha1/zz_generated.deepcopy.go
index f828d4d..99a89dc 100644
--- a/api/v1alpha1/zz_generated.deepcopy.go
+++ b/api/v1alpha1/zz_generated.deepcopy.go
@@ -26,21 +26,6 @@ import (
        runtime "k8s.io/apimachinery/pkg/runtime"
 )
 
-// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
-func (in *AlertConfig) DeepCopyInto(out *AlertConfig) {
-       *out = *in
-}
-
-// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new AlertConfig.
-func (in *AlertConfig) DeepCopy() *AlertConfig {
-       if in == nil {
-               return nil
-       }
-       out := new(AlertConfig)
-       in.DeepCopyInto(out)
-       return out
-}
-
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
 func (in *DSAlert) DeepCopyInto(out *DSAlert) {
        *out = *in
@@ -321,6 +306,11 @@ func (in *DSMasterSpec) DeepCopyInto(out *DSMasterSpec) {
                *out = new(PodPolicy)
                (*in).DeepCopyInto(*out)
        }
+       if in.HpaPolicy != nil {
+               in, out := &in.HpaPolicy, &out.HpaPolicy
+               *out = new(HpaPolicy)
+               **out = **in
+       }
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new DSMasterSpec.
@@ -426,11 +416,6 @@ func (in *DSWorkerSpec) DeepCopyInto(out *DSWorkerSpec) {
                *out = new(PodPolicy)
                (*in).DeepCopyInto(*out)
        }
-       if in.AlertConfig != nil {
-               in, out := &in.AlertConfig, &out.AlertConfig
-               *out = new(AlertConfig)
-               **out = **in
-       }
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new DSWorkerSpec.
@@ -543,6 +528,21 @@ func (in *DsCondition) DeepCopy() *DsCondition {
        return out
 }
 
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
+func (in *HpaPolicy) DeepCopyInto(out *HpaPolicy) {
+       *out = *in
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new HpaPolicy.
+func (in *HpaPolicy) DeepCopy() *HpaPolicy {
+       if in == nil {
+               return nil
+       }
+       out := new(HpaPolicy)
+       in.DeepCopyInto(out)
+       return out
+}
+
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
 func (in *MembersStatus) DeepCopyInto(out *MembersStatus) {
        *out = *in
diff --git a/config/crd/bases/ds.apache.dolphinscheduler.dev_dsmasters.yaml 
b/config/crd/bases/ds.apache.dolphinscheduler.dev_dsmasters.yaml
index 1823d52..a7b01be 100644
--- a/config/crd/bases/ds.apache.dolphinscheduler.dev_dsmasters.yaml
+++ b/config/crd/bases/ds.apache.dolphinscheduler.dev_dsmasters.yaml
@@ -51,6 +51,21 @@ spec:
                 - url
                 - username
                 type: object
+              hpa:
+                properties:
+                  cpu_average_utilization:
+                    format: int32
+                    type: integer
+                  max_replicas:
+                    format: int32
+                    type: integer
+                  mem_average_utilization:
+                    format: int32
+                    type: integer
+                  min_replicas:
+                    format: int32
+                    type: integer
+                type: object
               log_pvc_name:
                 description: LogPvcName defines the  log capacity of 
application ,the
                   position is /opt/dolphinscheduler/logs eg 20Gi
diff --git a/config/crd/bases/ds.apache.dolphinscheduler.dev_dsworkers.yaml 
b/config/crd/bases/ds.apache.dolphinscheduler.dev_dsworkers.yaml
index b3d53cb..b805afb 100644
--- a/config/crd/bases/ds.apache.dolphinscheduler.dev_dsworkers.yaml
+++ b/config/crd/bases/ds.apache.dolphinscheduler.dev_dsworkers.yaml
@@ -15,10 +15,22 @@ spec:
     singular: dsworker
   scope: Namespaced
   versions:
-  - name: v1alpha1
+  - additionalPrinterColumns:
+    - jsonPath: .spec.replicas
+      name: replicas
+      type: integer
+    - jsonPath: .spec.repository
+      name: repository
+      type: string
+    - jsonPath: .spec.version
+      name: version
+      type: string
+    - jsonPath: .spec.controlPaused
+      name: controlPaused
+      type: boolean
+    name: v1alpha1
     schema:
       openAPIV3Schema:
-        description: DSWorker is the Schema for the dsworkers API
         properties:
           apiVersion:
             description: 'APIVersion defines the versioned schema of this 
representation
@@ -35,14 +47,6 @@ spec:
           spec:
             description: DSWorkerSpec defines the desired state of DSWorker
             properties:
-              alert_config:
-                description: AlertConfig is the config of alertService
-                properties:
-                  port:
-                    type: string
-                  service_url:
-                    type: string
-                type: object
               datasource:
                 description: Datasource is the config of database
                 properties:
@@ -1419,11 +1423,20 @@ spec:
                 default: 0
                 description: Replicas is the current size of the cluster
                 type: integer
+              selector:
+                description: Selector must be the string form of the selector
+                type: string
+            required:
+            - selector
             type: object
         type: object
     served: true
     storage: true
     subresources:
+      scale:
+        labelSelectorPath: .status.selector
+        specReplicasPath: .spec.replicas
+        statusReplicasPath: .status.replicas
       status: {}
 status:
   acceptedNames:
diff --git a/config/ds/postgreSQL/postgres-service.yaml 
b/config/ds/postgreSQL/postgres-service.yaml
index ab7a73b..1af9894 100644
--- a/config/ds/postgreSQL/postgres-service.yaml
+++ b/config/ds/postgreSQL/postgres-service.yaml
@@ -6,10 +6,12 @@ metadata:
     app: postgres
   namespace: ds
 spec:
-  type: LoadBalancer
+  type: NodePort
+  #type: LoadBalancer
   ports:
     - port: 5432
       targetPort: 5432
+      nodePort: 30022
       protocol: TCP
   selector:
     app: postgres
\ No newline at end of file
diff --git a/config/samples/ds_v1alpha1_dsalert.yaml 
b/config/samples/ds_v1alpha1_dsalert.yaml
index 487d026..56c5647 100644
--- a/config/samples/ds_v1alpha1_dsalert.yaml
+++ b/config/samples/ds_v1alpha1_dsalert.yaml
@@ -11,6 +11,6 @@ spec:
   repository: apache/dolphinscheduler-alert-server
   datasource:
     drive_name: "org.postgresql.Driver"
-    url: "jdbc:postgresql://172.31.24.57:5432/dolphinscheduler"
+    url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler"
     username: "postgresadmin"
     password: "admin12345"
diff --git a/config/samples/ds_v1alpha1_dsapi.yaml 
b/config/samples/ds_v1alpha1_dsapi.yaml
index a330949..809c853 100644
--- a/config/samples/ds_v1alpha1_dsapi.yaml
+++ b/config/samples/ds_v1alpha1_dsapi.yaml
@@ -8,10 +8,10 @@ metadata:
 spec:
   replicas: 1
   version: 3.0.0-alpha
-  zookeeper_connect: "172.31.29.53:2181"
+  zookeeper_connect: "172.17.0.5:2181"
   repository: apache/dolphinscheduler-api
   datasource:
     drive_name: "org.postgresql.Driver"
-    url: "jdbc:postgresql://172.31.24.57:5432/dolphinscheduler"
+    url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler"
     username: "postgresadmin"
     password: "admin12345"
\ No newline at end of file
diff --git a/config/samples/ds_v1alpha1_dsmaster.yaml 
b/config/samples/ds_v1alpha1_dsmaster.yaml
index f4be4fb..8d0bbae 100644
--- a/config/samples/ds_v1alpha1_dsmaster.yaml
+++ b/config/samples/ds_v1alpha1_dsmaster.yaml
@@ -6,14 +6,18 @@ metadata:
   labels:
     app: ds-master
 spec:
-  replicas: 2
-  zookeeper_connect: "172.31.29.53:2181"
+  replicas: 1
+  zookeeper_connect: "172.17.0.5:2181"
   version: 3.0.0-alpha
   repository: apache/dolphinscheduler-master
   datasource:
     drive_name: "org.postgresql.Driver"
-    url: "jdbc:postgresql://172.31.24.57:5432/dolphinscheduler"
+    url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler"
     username: "postgresadmin"
     password: "admin12345"
+  hpa:
+    min_replicas: 1
+    max_replicas: 5
+    mem_average_utilization: 85
 
 
diff --git a/config/samples/ds_v1alpha1_dsworker.yaml 
b/config/samples/ds_v1alpha1_dsworker.yaml
index 7f818b2..92065f0 100644
--- a/config/samples/ds_v1alpha1_dsworker.yaml
+++ b/config/samples/ds_v1alpha1_dsworker.yaml
@@ -6,12 +6,20 @@ metadata:
   labels:
     app: ds-worker
 spec:
-  replicas: 3
-  zookeeper_connect: "172.31.29.53:2181"
+  replicas: 1
+  zookeeper_connect: "172.17.0.5:2181"
   version: 3.0.0-alpha
   repository: apache/dolphinscheduler-worker
   datasource:
     drive_name: "org.postgresql.Driver"
-    url: "jdbc:postgresql://172.31.24.57:5432/dolphinscheduler"
+    url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler"
     username: "postgresadmin"
-    password: "admin12345"
\ No newline at end of file
+    password: "admin12345"
+  pod:
+    resources:
+      limits:
+        cpu: "1000m"
+        memory: "2Gi"
+      requests:
+        cpu: "500m"
+        memory: "1Gi"
diff --git a/controllers/dsalert_controller.go 
b/controllers/dsalert_controller.go
index 2b6d88f..5f254ea 100644
--- a/controllers/dsalert_controller.go
+++ b/controllers/dsalert_controller.go
@@ -167,7 +167,7 @@ func (r *DSAlertReconciler) ensureAlertService(ctx 
context.Context, cluster *dsv
        namespacedName := types.NamespacedName{Namespace: cluster.Namespace, 
Name: dsv1alpha1.DsAlertServiceValue}
        if err := r.Client.Get(ctx, namespacedName, service); err != nil {
                // Local cache not found
-               logger.Info("get service error")
+               masterLogger.Info("get service error")
                if apierrors.IsNotFound(err) {
                        service = createAlertService(cluster)
                        if err := 
controllerutil.SetControllerReference(cluster, service, r.Scheme); err != nil {
diff --git a/controllers/dsapi_controller.go b/controllers/dsapi_controller.go
index 4e95511..7fbea79 100644
--- a/controllers/dsapi_controller.go
+++ b/controllers/dsapi_controller.go
@@ -50,7 +50,6 @@ type DSApiReconciler struct {
 
 // Reconcile is part of the main kubernetes reconciliation loop which aims to
 // move the current state of the cluster closer to the desired state.
-// TODO(user): Modify the Reconcile function to compare the state specified by
 // the DSApi object against the actual cluster state, and then
 // perform operations to make the cluster state reflect the state specified by
 // the user.
diff --git a/controllers/dsmaster_controller.go 
b/controllers/dsmaster_controller.go
index df05da5..18edab7 100644
--- a/controllers/dsmaster_controller.go
+++ b/controllers/dsmaster_controller.go
@@ -18,6 +18,7 @@ package controllers
 
 import (
        "context"
+       v2 "k8s.io/api/autoscaling/v2"
        corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/api/errors"
        apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -40,13 +41,8 @@ import (
        dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
 )
 
-const (
-       dsMasterLabel  = "ds-master"
-       dsServiceLabel = "ds-operator-service"
-)
-
 var (
-       logger = ctrl.Log.WithName("DSMaster-controller")
+       masterLogger = ctrl.Log.WithName("DSMaster-controller")
 )
 
 // DSMasterReconciler reconciles a DSMaster object
@@ -71,8 +67,8 @@ type DSMasterReconciler struct {
 // For more details, check Reconcile and its Result here:
 // - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
 func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) 
(ctrl.Result, error) {
-       logger.Info("dmMaster start reconcile logic")
-       defer logger.Info("dmMaster Reconcile end 
---------------------------------------------")
+       masterLogger.Info("dmMaster start reconcile logic")
+       defer masterLogger.Info("dmMaster Reconcile end 
---------------------------------------------")
 
        cluster := &dsv1alpha1.DSMaster{}
 
@@ -118,7 +114,7 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, 
req ctrl.Request) (c
        // If dsmaster-cluster is paused, we do nothing on things changed.
        // Until dsmaster-cluster is un-paused, we will reconcile to the  state 
of that point.
        if cluster.Spec.Paused {
-               logger.Info("ds-master control has been paused: ", 
"ds-master-name", cluster.Name)
+               masterLogger.Info("ds-master control has been paused: ", 
"ds-master-name", cluster.Name)
                desired.Status.ControlPaused = true
                if err := r.Status().Patch(ctx, desired, 
client.MergeFrom(cluster)); err != nil {
                        return ctrl.Result{}, err
@@ -130,32 +126,39 @@ func (r *DSMasterReconciler) Reconcile(ctx 
context.Context, req ctrl.Request) (c
        // 1. First time we see the ds-master-cluster, initialize it
        if cluster.Status.Phase == dsv1alpha1.DsPhaseNone {
                desired.Status.Phase = dsv1alpha1.DsPhaseCreating
-               logger.Info("phase had been changed from  none ---> creating")
+               masterLogger.Info("phase had been changed from  none ---> 
creating")
                err := r.Client.Status().Patch(ctx, desired, 
client.MergeFrom(cluster))
                return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err
        }
 
        //2 ensure the headless service
-       logger.Info("Ensuring cluster service")
+       masterLogger.Info("Ensuring cluster service")
 
        if err := r.ensureMasterService(ctx, cluster); err != nil {
                return ctrl.Result{}, err
        }
 
-       // 3. Ensure bootstrapped, we will block here util cluster is up and 
healthy
-       logger.Info("Ensuring cluster members")
+       //2 ensure the headless service
+       masterLogger.Info("Ensuring worker hpa")
+
+       if err := r.ensureHPA(ctx, cluster); err != nil {
+               return ctrl.Result{}, err
+       }
+
+       // 4. Ensure bootstrapped, we will block here util cluster is up and 
healthy
+       masterLogger.Info("Ensuring cluster members")
        if requeue, err := r.ensureMembers(ctx, cluster); requeue {
                return ctrl.Result{RequeueAfter: 5 * time.Second}, err
        }
 
-       // 4. Ensure cluster scaled
-       logger.Info("Ensuring cluster scaled")
+       // 5. Ensure cluster scaled
+       masterLogger.Info("Ensuring cluster scaled")
        if requeue, err := r.ensureScaled(ctx, cluster); requeue {
                return ctrl.Result{Requeue: true, RequeueAfter: 5 * 
time.Second}, err
        }
 
-       // .5 Ensure cluster upgraded
-       logger.Info("Ensuring cluster upgraded")
+       // 6. Ensure cluster upgraded
+       masterLogger.Info("Ensuring cluster upgraded")
        if requeue, err := r.ensureUpgraded(ctx, cluster); requeue {
                return ctrl.Result{Requeue: true}, err
        }
@@ -165,7 +168,7 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, 
req ctrl.Request) (c
                return ctrl.Result{}, err
        }
 
-       logger.Info("******************************************************")
+       
masterLogger.Info("******************************************************")
        desired.Status.Phase = dsv1alpha1.DsPhaseNone
        if err := r.Update(ctx, desired); err != nil {
                return ctrl.Result{}, err
@@ -184,6 +187,7 @@ func (r *DSMasterReconciler) SetupWithManager(mgr 
ctrl.Manager) error {
                For(&dsv1alpha1.DSMaster{}).
                Owns(&corev1.Pod{}).
                Owns(&corev1.Service{}).
+               Owns(&v2.HorizontalPodAutoscaler{}).
                Watches(&source.Channel{Source: r.resyncCh}, 
&handler.EnqueueRequestForObject{}).
                // or use WithEventFilter()
                WithEventFilter(filter).
@@ -239,11 +243,11 @@ func (r *DSMasterReconciler) ensureScaled(ctx 
context.Context, cluster *dsv1alph
 }
 
 func (r *DSMasterReconciler) createMember(ctx context.Context, cluster 
*dsv1alpha1.DSMaster) error {
-       logger.Info("Starting add new member to cluster", "cluster", 
cluster.Name)
-       defer logger.Info("End add new member to cluster", "cluster", 
cluster.Name)
+       masterLogger.Info("Starting add new member to cluster", "cluster", 
cluster.Name)
+       defer masterLogger.Info("End add new member to cluster", "cluster", 
cluster.Name)
 
        // New Pod
-       pod, err := r.newDSMasterPod(ctx, cluster)
+       pod, err := r.newDSMasterPod(cluster)
        if err != nil {
                return err
        }
@@ -256,7 +260,7 @@ func (r *DSMasterReconciler) createMember(ctx 
context.Context, cluster *dsv1alph
 }
 
 func (r *DSMasterReconciler) deletePod(ctx context.Context, pod *corev1.Pod) 
error {
-       logger.Info("begin delete pod", "pod name", pod.Name)
+       masterLogger.Info("begin delete pod", "pod name", pod.Name)
        if err := r.Client.Delete(ctx, pod); err != nil && 
!apierrors.IsNotFound(err) {
                return err
        }
@@ -269,7 +273,7 @@ func (r *DSMasterReconciler) ensureUpgraded(ctx 
context.Context, cluster *dsv1al
                return false, err
        }
 
-       logger.Info("cluster.Spec.Version", "cluster.Spec.Version", 
cluster.Spec.Version)
+       masterLogger.Info("cluster.Spec.Version", "cluster.Spec.Version", 
cluster.Spec.Version)
        for _, memset := range ms {
                if r.predicateUpdate(memset, cluster) {
                        pod := &corev1.Pod{}
@@ -289,7 +293,7 @@ func getNeedUpgradePods(ctx context.Context, cli 
*kubernetes.Clientset, cluster
        if err != nil {
                return nil, err
        }
-       podAppSelect, err := labels.NewRequirement(dsv1alpha1.DsAppName, 
selection.Equals, []string{dsMasterLabel})
+       podAppSelect, err := labels.NewRequirement(dsv1alpha1.DsAppName, 
selection.Equals, []string{dsv1alpha1.DsMasterLabel})
        if err != nil {
                return nil, err
        }
@@ -305,10 +309,10 @@ func (r *DSMasterReconciler) ensureMasterService(ctx 
context.Context, cluster *d
 
        // 1. Client service
        service := &corev1.Service{}
-       namespacedName := types.NamespacedName{Namespace: cluster.Namespace, 
Name: dsv1alpha1.DsServiceLabelValue}
+       namespacedName := types.NamespacedName{Namespace: cluster.Namespace, 
Name: dsv1alpha1.DsHeadLessServiceLabel}
        if err := r.Client.Get(ctx, namespacedName, service); err != nil {
                // Local cache not found
-               if apierrors.IsNotFound(err) {
+               if apierrors.IsNotFound(err) && !apierrors.IsAlreadyExists(err) 
{
                        service = createMasterService(cluster)
                        if err := 
controllerutil.SetControllerReference(cluster, service, r.Scheme); err != nil {
                                return err
@@ -317,6 +321,8 @@ func (r *DSMasterReconciler) ensureMasterService(ctx 
context.Context, cluster *d
                        if err := r.Client.Create(ctx, service); err != nil {
                                return err
                        }
+
+                       r.recorder.Event(cluster, corev1.EventTypeNormal, 
"ds-operator-service created", "master headless service had been created")
                }
        }
        return nil
@@ -391,3 +397,31 @@ func (r *Predicate) Generic(evt event.GenericEvent) bool {
 func (r *DSMasterReconciler) predicateUpdate(member *Member, cluster 
*dsv1alpha1.DSMaster) bool {
        return member.Version != cluster.Spec.Version
 }
+
+func (r *DSMasterReconciler) ensureHPA(ctx context.Context, cluster 
*dsv1alpha1.DSMaster) error {
+       hpa := &v2.HorizontalPodAutoscaler{}
+       namespacedName := types.NamespacedName{Namespace: cluster.Namespace, 
Name: dsv1alpha1.DsWorkerHpa}
+       if err := r.Client.Get(ctx, namespacedName, hpa); err != nil {
+               // Local cache not found
+               if apierrors.IsNotFound(err) && cluster.Spec.HpaPolicy != nil {
+                       hpa := r.createHPA(cluster)
+                       if err := 
controllerutil.SetControllerReference(cluster, hpa, r.Scheme); err != nil {
+                               masterLogger.Info("set controller worker hpa 
error")
+                               return err
+                       }
+                       // Remote may already exist, so we will return err, for 
the next time, this code will not execute
+                       if err := r.Client.Create(ctx, hpa); err != nil {
+                               masterLogger.Info("create worker hpa error")
+                               return err
+                       }
+               }
+       }
+
+       if &hpa != nil && cluster.Spec.HpaPolicy == nil {
+               if err := r.deleteHPA(ctx, hpa); err != nil {
+                       masterLogger.Info("delete hpa error")
+                       return err
+               }
+       }
+       return nil
+}
diff --git a/controllers/dsworker_controller.go 
b/controllers/dsworker_controller.go
index e16abc7..7a9262c 100644
--- a/controllers/dsworker_controller.go
+++ b/controllers/dsworker_controller.go
@@ -18,35 +18,30 @@ package controllers
 
 import (
        "context"
-       "github.com/go-logr/logr"
        corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/api/errors"
        apierrors "k8s.io/apimachinery/pkg/api/errors"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/client-go/tools/record"
        ctrl "sigs.k8s.io/controller-runtime"
        "sigs.k8s.io/controller-runtime/pkg/client"
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+       "sigs.k8s.io/controller-runtime/pkg/reconcile"
        "time"
 
        dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
 )
 
-const (
-       dsWorkerLabel  = "ds-worker"
-       dsWorkerConfig = "ds-worker-config"
-)
-
 // DSWorkerReconciler reconciles a DSWorker object
 type DSWorkerReconciler struct {
        client.Client
-       Log      logr.Logger
        Scheme   *runtime.Scheme
-       Recorder record.EventRecorder
+       recorder record.EventRecorder
 }
 
 var (
-       worker_logger = ctrl.Log.WithName("DSWorker-controller")
+       workerLogger = ctrl.Log.WithName("DSWorker-controller")
 )
 
 
//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsworkers,verbs=get;list;watch;create;update;patch;delete
@@ -62,14 +57,14 @@ var (
 // For more details, check Reconcile and its Result here:
 // - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
 func (r *DSWorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) 
(ctrl.Result, error) {
-       worker_logger.Info("dmWorker start reconcile logic")
-       defer worker_logger.Info("dmWorker Reconcile end 
---------------------------------------------")
+       workerLogger.Info("dmWorker start reconcile logic")
+       defer workerLogger.Info("dmWorker Reconcile end 
---------------------------------------------")
 
        cluster := &dsv1alpha1.DSWorker{}
 
        if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
                if errors.IsNotFound(err) {
-                       r.Recorder.Event(cluster, corev1.EventTypeWarning, 
"dmWorker is not Found", "dmWorker is not Found")
+                       r.recorder.Event(cluster, corev1.EventTypeWarning, 
"dmWorker is not Found", "dmWorker is not Found")
                        return ctrl.Result{}, nil
                }
                return ctrl.Result{}, err
@@ -107,37 +102,45 @@ func (r *DSWorkerReconciler) Reconcile(ctx 
context.Context, req ctrl.Request) (c
        // If dsworker-cluster is paused, we do nothing on things changed.
        // Until dsworker-cluster is un-paused, we will reconcile to the 
dsworker state of that point.
        if cluster.Spec.Paused {
-               worker_logger.Info("ds-worker control has been paused: ", 
"ds-worker-name", cluster.Name)
+               workerLogger.Info("ds-worker control has been paused: ", 
"ds-worker-name", cluster.Name)
                desired.Status.ControlPaused = true
                if err := r.Status().Patch(ctx, desired, 
client.MergeFrom(cluster)); err != nil {
                        return ctrl.Result{}, err
                }
-               r.Recorder.Event(cluster, corev1.EventTypeNormal, "the spec 
status is paused", "do nothing")
+               r.recorder.Event(cluster, corev1.EventTypeNormal, "the spec 
status is paused", "do nothing")
                return ctrl.Result{}, nil
        }
 
-       // 1. First time we see the ds-dsworker-cluster, initialize it
+       // 1. First time we see the ds-worker-cluster, initialize it
        if cluster.Status.Phase == dsv1alpha1.DsPhaseNone {
+               if desired.Status.Selector == "" {
+                       selector, err := 
metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: 
LabelForWorkerPod()})
+                       if err != nil {
+                               masterLogger.Error(err, "Error retrieving 
selector labels")
+                               return reconcile.Result{}, err
+                       }
+                       desired.Status.Selector = selector.String()
+               }
                desired.Status.Phase = dsv1alpha1.DsPhaseCreating
-               worker_logger.Info("phase had been changed from  none ---> 
creating")
+               workerLogger.Info("phase had been changed from  none ---> 
creating")
                err := r.Client.Status().Patch(ctx, desired, 
client.MergeFrom(cluster))
                return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err
        }
 
        // 3. Ensure bootstrapped, we will block here util cluster is up and 
healthy
-       worker_logger.Info("Ensuring cluster members")
+       workerLogger.Info("Ensuring cluster members")
        if requeue, err := r.ensureMembers(ctx, cluster); requeue {
                return ctrl.Result{RequeueAfter: 5 * time.Second}, err
        }
 
        // 4. Ensure cluster scaled
-       worker_logger.Info("Ensuring cluster scaled")
+       workerLogger.Info("Ensuring cluster scaled")
        if requeue, err := r.ensureScaled(ctx, cluster); requeue {
                return ctrl.Result{Requeue: true, RequeueAfter: 5 * 
time.Second}, err
        }
 
        // .5 Ensure cluster upgraded
-       worker_logger.Info("Ensuring cluster upgraded")
+       workerLogger.Info("Ensuring cluster upgraded")
        if requeue, err := r.ensureUpgraded(ctx, cluster); requeue {
                return ctrl.Result{Requeue: true}, err
        }
@@ -147,13 +150,14 @@ func (r *DSWorkerReconciler) Reconcile(ctx 
context.Context, req ctrl.Request) (c
                return ctrl.Result{}, err
        }
 
-       
worker_logger.Info("******************************************************")
+       
workerLogger.Info("******************************************************")
        desired.Status.Phase = dsv1alpha1.DsPhaseNone
        return ctrl.Result{Requeue: false}, nil
 }
 
 // SetupWithManager sets up the controller with the Manager.
 func (r *DSWorkerReconciler) SetupWithManager(mgr ctrl.Manager) error {
+       r.recorder = mgr.GetEventRecorderFor("worker-controller")
        return ctrl.NewControllerManagedBy(mgr).
                For(&dsv1alpha1.DSWorker{}).
                Owns(&corev1.Pod{}).
@@ -185,7 +189,7 @@ func (r *DSWorkerReconciler) ensureScaled(ctx 
context.Context, cluster *dsv1alph
        if len(ms) < cluster.Spec.Replicas {
                err = r.createMember(ctx, cluster)
                if err != nil {
-                       r.Recorder.Event(cluster, corev1.EventTypeWarning, 
"cannot create the new ds-dsworker pod", "the ds-dsworker pod had been created 
failed")
+                       r.recorder.Event(cluster, corev1.EventTypeWarning, 
"cannot create the new ds-dsworker pod", "the ds-dsworker pod had been created 
failed")
                        return true, err
                }
                return true, err
@@ -197,7 +201,7 @@ func (r *DSWorkerReconciler) ensureScaled(ctx 
context.Context, cluster *dsv1alph
                member := ms.PickOne()
                pod.SetName(member.Name)
                pod.SetNamespace(member.Namespace)
-               err = r.deleteMember(ctx, pod)
+               err = r.deleteMember(ctx, pod, cluster)
                if err != nil {
                        return true, err
                }
@@ -207,9 +211,37 @@ func (r *DSWorkerReconciler) ensureScaled(ctx 
context.Context, cluster *dsv1alph
        return false, nil
 }
 
+func (r *DSWorkerReconciler) ensureUpgraded(ctx context.Context, cluster 
*dsv1alpha1.DSWorker) (bool, error) {
+
+       ms, err := r.podMemberSet(ctx, cluster)
+       if err != nil {
+               return false, err
+       }
+
+       for _, memset := range ms {
+               if memset.Version != cluster.Spec.Version {
+                       pod := &corev1.Pod{}
+                       pod.SetName(memset.Name)
+                       pod.SetNamespace(memset.Namespace)
+                       if err := r.deleteMember(ctx, pod, cluster); err != nil 
{
+                               return false, err
+                       }
+                       return true, nil
+               }
+       }
+       return false, nil
+}
+
+func (r *DSWorkerReconciler) ensureDSWorkerDeleted(ctx context.Context, 
cluster *dsv1alpha1.DSWorker) error {
+       if err := r.Client.Delete(ctx, cluster, 
client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
+               return err
+       }
+       return nil
+}
+
 func (r *DSWorkerReconciler) createMember(ctx context.Context, cluster 
*dsv1alpha1.DSWorker) error {
-       worker_logger.Info("Starting add new member to cluster", "cluster", 
cluster.Name)
-       defer worker_logger.Info("End add new member to cluster", "cluster", 
cluster.Name)
+       workerLogger.Info("Starting add new member to cluster", "cluster", 
cluster.Name)
+       defer workerLogger.Info("End add new member to cluster", "cluster", 
cluster.Name)
 
        // New Pod
        pod, err := r.newDSWorkerPod(ctx, cluster)
@@ -221,36 +253,26 @@ func (r *DSWorkerReconciler) createMember(ctx 
context.Context, cluster *dsv1alph
        if err = r.Client.Create(ctx, pod); err != nil && 
!apierrors.IsAlreadyExists(err) {
                return err
        }
-       return nil
-}
-
-func (r *DSWorkerReconciler) deleteMember(ctx context.Context, pod 
*corev1.Pod) error {
 
-       worker_logger.Info("begin delete pod", "pod name", pod.Name)
-       if err := r.Client.Delete(ctx, pod); err != nil && 
!apierrors.IsNotFound(err) {
+       desired := cluster.DeepCopy()
+       desired.Spec.Replicas += 1
+       if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); 
err != nil {
                return err
        }
-
        return nil
 }
 
-func (r *DSWorkerReconciler) ensureUpgraded(ctx context.Context, cluster 
*dsv1alpha1.DSWorker) (bool, error) {
+func (r *DSWorkerReconciler) deleteMember(ctx context.Context, pod 
*corev1.Pod, cluster *dsv1alpha1.DSWorker) error {
 
-       ms, err := r.podMemberSet(ctx, cluster)
-       if err != nil {
-               return false, err
+       workerLogger.Info("begin delete pod", "pod name", pod.Name)
+       if err := r.Client.Delete(ctx, pod); err != nil && 
!apierrors.IsNotFound(err) {
+               return err
        }
 
-       for _, memset := range ms {
-               if memset.Version != cluster.Spec.Version {
-                       pod := &corev1.Pod{}
-                       pod.SetName(memset.Name)
-                       pod.SetNamespace(memset.Namespace)
-                       if err := r.deleteMember(ctx, pod); err != nil {
-                               return false, err
-                       }
-                       return true, nil
-               }
+       desired := cluster.DeepCopy()
+       desired.Spec.Replicas -= 1
+       if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); 
err != nil {
+               return err
        }
-       return false, nil
+       return nil
 }
diff --git a/controllers/master_reconcile.go b/controllers/master_reconcile.go
index 2c6e722..e480b7f 100644
--- a/controllers/master_reconcile.go
+++ b/controllers/master_reconcile.go
@@ -19,12 +19,12 @@ package controllers
 import (
        "context"
        dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
-       "errors"
+       v2 "k8s.io/api/autoscaling/v2"
+       _ "k8s.io/api/core/v1"
        corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "sigs.k8s.io/controller-runtime/pkg/client"
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
-       "strings"
 )
 
 func (r *DSMasterReconciler) podMemberSet(ctx context.Context, cluster 
*dsv1alpha1.DSMaster) (MemberSet, error) {
@@ -32,7 +32,7 @@ func (r *DSMasterReconciler) podMemberSet(ctx 
context.Context, cluster *dsv1alph
        pods := &corev1.PodList{}
 
        if err := r.Client.List(ctx, pods, 
client.InNamespace(cluster.Namespace),
-               client.MatchingLabels(LabelsForCluster(dsMasterLabel))); err != 
nil {
+               
client.MatchingLabels(LabelsForCluster(dsv1alpha1.DsMasterLabel))); err != nil {
                return members, err
        }
 
@@ -55,46 +55,6 @@ func (r *DSMasterReconciler) podMemberSet(ctx 
context.Context, cluster *dsv1alph
        return members, nil
 }
 
-func (r *DSMasterReconciler) currentMemberSet(ctx context.Context, cluster 
*dsv1alpha1.DSMaster) (MemberSet, error) {
-       members := MemberSet{}
-
-       // Normally will not happen
-       ms, ok := cluster.Annotations[dsv1alpha1.ClusterMembersAnnotation]
-       if !ok || ms == "" {
-               return members, errors.New("cluster spec has no members 
annotation")
-       }
-
-       names := strings.Split(ms, ",")
-
-       pods := &corev1.PodList{}
-       if err := r.Client.List(ctx, pods, 
client.InNamespace(cluster.Namespace),
-               client.MatchingLabels(LabelsForCluster(dsMasterLabel))); err != 
nil {
-               return members, err
-       }
-
-       podMaps := map[string]corev1.Pod{}
-       for _, pod := range pods.Items {
-               podMaps[pod.Name] = pod
-       }
-
-       for _, name := range names {
-               m := &Member{
-                       Name:            name,
-                       Namespace:       cluster.Namespace,
-                       Created:         false,
-                       RunningAndReady: false,
-               }
-
-               if pod, ok := podMaps[name]; ok {
-                       m.Created = true
-                       m.RunningAndReady = IsRunningAndReady(&pod)
-                       m.Version = pod.Labels[dsv1alpha1.DsVersionLabel]
-               }
-               members.Add(m)
-       }
-       return members, nil
-}
-
 func newDSMasterPod(cr *dsv1alpha1.DSMaster) *corev1.Pod {
        var isSetHostnameAsFQDN bool
        isSetHostnameAsFQDN = true
@@ -103,7 +63,7 @@ func newDSMasterPod(cr *dsv1alpha1.DSMaster) *corev1.Pod {
                ObjectMeta: metav1.ObjectMeta{
                        Name:      podName,
                        Namespace: cr.Namespace,
-                       Labels: map[string]string{dsv1alpha1.DsAppName: 
dsMasterLabel,
+                       Labels: map[string]string{dsv1alpha1.DsAppName: 
dsv1alpha1.DsMasterLabel,
                                dsv1alpha1.DsVersionLabel: cr.Spec.Version,
                                dsv1alpha1.DsServiceLabel: 
dsv1alpha1.DsServiceLabelValue},
                },
@@ -151,7 +111,7 @@ func (r *DSMasterReconciler) ensureDSMasterDeleted(ctx 
context.Context, DSMaster
        return nil
 }
 
-func (r *DSMasterReconciler) newDSMasterPod(ctx context.Context, cluster 
*dsv1alpha1.DSMaster) (*corev1.Pod, error) {
+func (r *DSMasterReconciler) newDSMasterPod(cluster *dsv1alpha1.DSMaster) 
(*corev1.Pod, error) {
        // Create pod
        pod := newDSMasterPod(cluster)
        if err := controllerutil.SetControllerReference(cluster, pod, 
r.Scheme); err != nil {
@@ -166,10 +126,9 @@ func createMasterService(cluster *dsv1alpha1.DSMaster) 
*corev1.Service {
        labels_ := LabelsForService()
        service := corev1.Service{
                ObjectMeta: metav1.ObjectMeta{
-                       Name:         dsv1alpha1.DsServiceLabelValue,
-                       GenerateName: dsv1alpha1.DsServiceLabelValue,
-                       Namespace:    cluster.Namespace,
-                       Labels:       map[string]string{dsv1alpha1.DsAppName: 
dsServiceLabel},
+                       Name:      dsv1alpha1.DsHeadLessServiceLabel,
+                       Namespace: cluster.Namespace,
+                       Labels:    map[string]string{dsv1alpha1.DsAppName: 
dsv1alpha1.DsHeadLessServiceLabel},
                },
                Spec: corev1.ServiceSpec{
                        Selector:                 labels_,
@@ -179,3 +138,57 @@ func createMasterService(cluster *dsv1alpha1.DSMaster) 
*corev1.Service {
        }
        return &service
 }
+
+func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) 
*v2.HorizontalPodAutoscaler {
+       hpa := v2.HorizontalPodAutoscaler{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:            dsv1alpha1.DsWorkerHpa,
+                       Namespace:       cluster.Namespace,
+                       ResourceVersion: dsv1alpha1.DSVersion,
+               },
+               Spec: v2.HorizontalPodAutoscalerSpec{
+                       ScaleTargetRef: v2.CrossVersionObjectReference{
+                               Kind:       dsv1alpha1.DsWorkerKind,
+                               Name:       dsv1alpha1.DsWorkerLabel,
+                               APIVersion: dsv1alpha1.APIVersion,
+                       },
+                       MinReplicas: &cluster.Spec.HpaPolicy.MinReplicas,
+                       MaxReplicas: cluster.Spec.HpaPolicy.MaxReplicas,
+               },
+       }
+
+       if cluster.Spec.HpaPolicy.CPUAverageUtilization > 0 {
+               hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2.MetricSpec{
+                       Type: v2.ResourceMetricSourceType,
+                       Resource: &v2.ResourceMetricSource{
+                               Name: corev1.ResourceCPU,
+                               Target: v2.MetricTarget{
+                                       Type:               
v2.UtilizationMetricType,
+                                       AverageUtilization: 
&cluster.Spec.HpaPolicy.CPUAverageUtilization,
+                               },
+                       },
+               })
+       }
+
+       if cluster.Spec.HpaPolicy.MEMAverageUtilization > 0 {
+               hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2.MetricSpec{
+                       Type: v2.ResourceMetricSourceType,
+                       Resource: &v2.ResourceMetricSource{
+                               Name: corev1.ResourceMemory,
+                               Target: v2.MetricTarget{
+                                       Type:               
v2.UtilizationMetricType,
+                                       AverageUtilization: 
&cluster.Spec.HpaPolicy.MEMAverageUtilization,
+                               },
+                       },
+               })
+       }
+
+       return &hpa
+}
+
+func (r *DSMasterReconciler) deleteHPA(ctx context.Context, hpa 
*v2.HorizontalPodAutoscaler) error {
+       if err := r.Client.Delete(ctx, hpa, 
client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
+               return err
+       }
+       return nil
+}
diff --git a/controllers/pod.go b/controllers/pod.go
index 35caf01..405c7ac 100644
--- a/controllers/pod.go
+++ b/controllers/pod.go
@@ -23,16 +23,6 @@ import (
        "k8s.io/apimachinery/pkg/labels"
 )
 
-const (
-       dsLogVolumeName       = "ds-log"
-       dsLogVolumeMountDir   = "/opt/dolphinscheduler/logs"
-       dsShareVolumeName     = "ds-soft"
-       dsShareVolumeMountDir = "/opt/soft"
-)
-
-type PodTemplate struct {
-}
-
 func applyPodPolicy(pod *corev1.Pod, policy *dsv1alpha1.PodPolicy) {
        if policy == nil {
                return
@@ -53,6 +43,7 @@ func applyPodPolicy(pod *corev1.Pod, policy 
*dsv1alpha1.PodPolicy) {
        mergeLabels(pod.Labels, policy.Labels)
 
        if &policy.Resources != nil {
+               workerLogger.Info("the resources is ", "resources", 
policy.Resources)
                pod.Spec.Containers[0] = 
containerWithRequirements(pod.Spec.Containers[0], policy.Resources)
        }
 
@@ -79,6 +70,10 @@ func PodWithNodeSelector(p *corev1.Pod, ns 
map[string]string) *corev1.Pod {
        return p
 }
 
+func LabelForWorkerPod() map[string]string {
+       return LabelsForCluster(dsv1alpha1.DsWorkerLabel)
+}
+
 func LabelsForCluster(lbs string) map[string]string {
        return labels.Set{dsv1alpha1.DsAppName: lbs}
 }
@@ -89,11 +84,11 @@ func LabelsForService() map[string]string {
 
 // AddLogVolumeToPod abstract the process of appending volume spec to pod spec
 func AddLogVolumeToPod(pod *corev1.Pod, pvcName string) {
-       vol := corev1.Volume{Name: dsLogVolumeName}
+       vol := corev1.Volume{Name: dsv1alpha1.DsLogVolumeName}
 
        vom := corev1.VolumeMount{
-               Name:      dsLogVolumeName,
-               MountPath: dsLogVolumeMountDir,
+               Name:      dsv1alpha1.DsLogVolumeName,
+               MountPath: dsv1alpha1.DsLogVolumeMountDir,
                SubPath:   pod.Name,
        }
 
@@ -112,11 +107,11 @@ func AddLogVolumeToPod(pod *corev1.Pod, pvcName string) {
 // AddLibVolumeToPod abstract the process of appending volume /opt/soft spec 
to pod spec,it is shared by all worker nodes,and it is read only
 // Suggest to mount a share volume in production env directly
 func AddLibVolumeToPod(pod *corev1.Pod, pvcName string) {
-       vol := corev1.Volume{Name: dsShareVolumeName}
+       vol := corev1.Volume{Name: dsv1alpha1.DsShareVolumeName}
 
        vom := corev1.VolumeMount{
-               Name:      dsShareVolumeName,
-               MountPath: dsShareVolumeMountDir,
+               Name:      dsv1alpha1.DsShareVolumeName,
+               MountPath: dsv1alpha1.DsShareVolumeMountDir,
                ReadOnly:  true,
        }
 
diff --git a/controllers/worker_reconcile.go b/controllers/worker_reconcile.go
index 8ffe129..d0ce14e 100644
--- a/controllers/worker_reconcile.go
+++ b/controllers/worker_reconcile.go
@@ -19,12 +19,10 @@ package controllers
 import (
        "context"
        dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
-       "errors"
        corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "sigs.k8s.io/controller-runtime/pkg/client"
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
-       "strings"
 )
 
 func (r *DSWorkerReconciler) podMemberSet(ctx context.Context, cluster 
*dsv1alpha1.DSWorker) (MemberSet, error) {
@@ -32,7 +30,7 @@ func (r *DSWorkerReconciler) podMemberSet(ctx 
context.Context, cluster *dsv1alph
        pods := &corev1.PodList{}
 
        if err := r.Client.List(ctx, pods, 
client.InNamespace(cluster.Namespace),
-               client.MatchingLabels(LabelsForCluster(dsWorkerLabel))); err != 
nil {
+               
client.MatchingLabels(LabelsForCluster(dsv1alpha1.DsWorkerLabel))); err != nil {
                return members, err
        }
 
@@ -55,53 +53,13 @@ func (r *DSWorkerReconciler) podMemberSet(ctx 
context.Context, cluster *dsv1alph
        return members, nil
 }
 
-func (r *DSWorkerReconciler) currentMemberSet(ctx context.Context, cluster 
*dsv1alpha1.DSWorker) (MemberSet, error) {
-       members := MemberSet{}
-
-       // Normally will not happen
-       ms, ok := cluster.Annotations[dsv1alpha1.ClusterMembersAnnotation]
-       if !ok || ms == "" {
-               return members, errors.New("cluster spec has no members 
annotation")
-       }
-
-       names := strings.Split(ms, ",")
-
-       pods := &corev1.PodList{}
-       if err := r.Client.List(ctx, pods, 
client.InNamespace(cluster.Namespace),
-               client.MatchingLabels(LabelsForCluster(dsWorkerLabel))); err != 
nil {
-               return members, err
-       }
-
-       podMaps := map[string]corev1.Pod{}
-       for _, pod := range pods.Items {
-               podMaps[pod.Name] = pod
-       }
-
-       for _, name := range names {
-               m := &Member{
-                       Name:            name,
-                       Namespace:       cluster.Namespace,
-                       Created:         false,
-                       RunningAndReady: false,
-               }
-
-               if pod, ok := podMaps[name]; ok {
-                       m.Created = true
-                       m.RunningAndReady = IsRunningAndReady(&pod)
-                       m.Version = pod.Labels[dsv1alpha1.DsVersionLabel]
-               }
-               members.Add(m)
-       }
-       return members, nil
-}
-
 func newDSWorkerPod(cr *dsv1alpha1.DSWorker) *corev1.Pod {
        var podName = cr.Name + "-pod" + dsv1alpha1.RandStr(6)
        return &corev1.Pod{
                ObjectMeta: metav1.ObjectMeta{
                        Name:      podName,
                        Namespace: cr.Namespace,
-                       Labels: map[string]string{dsv1alpha1.DsAppName: 
dsWorkerLabel,
+                       Labels: map[string]string{dsv1alpha1.DsAppName: 
dsv1alpha1.DsWorkerLabel,
                                dsv1alpha1.DsVersionLabel: cr.Spec.Version,
                                dsv1alpha1.DsServiceLabel: 
dsv1alpha1.DsServiceLabelValue,
                        },
@@ -146,13 +104,6 @@ func newDSWorkerPod(cr *dsv1alpha1.DSWorker) *corev1.Pod {
        }
 }
 
-func (r *DSWorkerReconciler) ensureDSWorkerDeleted(ctx context.Context, 
dsWorker *dsv1alpha1.DSWorker) error {
-       if err := r.Client.Delete(ctx, dsWorker, 
client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
-               return err
-       }
-       return nil
-}
-
 func (r *DSWorkerReconciler) newDSWorkerPod(ctx context.Context, cluster 
*dsv1alpha1.DSWorker) (*corev1.Pod, error) {
        // Create pod
        pod := newDSWorkerPod(cluster)

Reply via email to