This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-operator.git
commit 452d22b5541cca936ad4ca9acef4c300930181b1 Author: nobolity <[email protected]> AuthorDate: Mon Jun 6 16:34:59 2022 +0800 feat(master && worker): add feature hpa --- api/v1alpha1/ds_public.go | 22 +++- api/v1alpha1/dsmaster_types.go | 3 +- api/v1alpha1/dsworker_types.go | 19 ++-- api/v1alpha1/zz_generated.deepcopy.go | 40 ++++---- .../ds.apache.dolphinscheduler.dev_dsmasters.yaml | 15 +++ .../ds.apache.dolphinscheduler.dev_dsworkers.yaml | 33 ++++-- config/ds/postgreSQL/postgres-service.yaml | 4 +- config/samples/ds_v1alpha1_dsalert.yaml | 2 +- config/samples/ds_v1alpha1_dsapi.yaml | 4 +- config/samples/ds_v1alpha1_dsmaster.yaml | 10 +- config/samples/ds_v1alpha1_dsworker.yaml | 16 ++- controllers/dsalert_controller.go | 2 +- controllers/dsapi_controller.go | 1 - controllers/dsmaster_controller.go | 86 +++++++++++----- controllers/dsworker_controller.go | 114 ++++++++++++--------- controllers/master_reconcile.go | 111 +++++++++++--------- controllers/pod.go | 27 ++--- controllers/worker_reconcile.go | 53 +--------- 18 files changed, 315 insertions(+), 247 deletions(-) diff --git a/api/v1alpha1/ds_public.go b/api/v1alpha1/ds_public.go index efdb8b5..260097c 100644 --- a/api/v1alpha1/ds_public.go +++ b/api/v1alpha1/ds_public.go @@ -20,16 +20,22 @@ const ( DsConditionScaling = "Scaling" DsConditionUpgrading = "Upgrading" - ClusterMembersAnnotation = "github.com/nobolity/members" - ClusterUpgradeAnnotation = "github.com/nobolity/upgrade" - ClusterBootStrappedAnnotation = "github.com/nobolity/bootstrapped" - DsAppName = "app" + APIVersion = "ds.apache.dolphinscheduler.dev/v1alpha1" DsVersionLabel = "ds-version" + DsLogVolumeName = "ds-log" + DsLogVolumeMountDir = "/opt/dolphinscheduler/logs" + DsShareVolumeName = "ds-soft" + DsShareVolumeMountDir = "/opt/soft" + DSVersion = "v1alpha1" FinalizerName = "github.com.nobolity.dolphinscheduler-operator" EnvZookeeper = "REGISTRY_ZOOKEEPER_CONNECT_STRING" DsServiceLabel = "service-name" DsServiceLabelValue = "ds-service" + DsMasterLabel = "ds-master" + DsHeadLessServiceLabel = "ds-operator-service" + DsWorkerLabel = "ds-worker" + DsWorkerKind = "DSWorker" DsAlert = "ds-alert" DsAlertServiceValue = "ds-alert-service" DsAlertDeploymentValue = "ds-alert-deployment" @@ -42,6 +48,7 @@ const ( DataSourcePassWord = "SPRING_DATASOURCE_PASSWORD" DsApiPort = 12345 DsAlertPort = 50052 + DsWorkerHpa = "ds-worker-hpa" ) // DsCondition represents one current condition of a ds cluster. @@ -109,6 +116,13 @@ type DeploymentPolicy struct { Tolerations []corev1.Toleration `json:"tolerations,omitempty"` } +type HpaPolicy struct { + MinReplicas int32 `json:"min_replicas,omitempty"` + MaxReplicas int32 `json:"max_replicas,omitempty"` + CPUAverageUtilization int32 `json:"cpu_average_utilization,omitempty"` + MEMAverageUtilization int32 `json:"mem_average_utilization,omitempty"` +} + type MembersStatus struct { // Ready are the dsMaster members that are ready to serve requests // The member names are the same as the dsMaster pod names diff --git a/api/v1alpha1/dsmaster_types.go b/api/v1alpha1/dsmaster_types.go index 581835c..dfb743d 100644 --- a/api/v1alpha1/dsmaster_types.go +++ b/api/v1alpha1/dsmaster_types.go @@ -57,6 +57,8 @@ type DSMasterSpec struct { // Updating Pod does not take effect on any existing dm-master pods. Pod *PodPolicy `json:"pod,omitempty"` + HpaPolicy *HpaPolicy `json:"hpa,omitempty"` + // Paused is to pause the control of the operator for the ds-master . // +kubebuilder:default=false Paused bool `json:"paused,omitempty"` @@ -89,7 +91,6 @@ type DSMasterStatus struct { //+kubebuilder:object:root=true //+kubebuilder:subresource:status - // DSMaster is the Schema for the dsmasters API type DSMaster struct { metav1.TypeMeta `json:",inline"` diff --git a/api/v1alpha1/dsworker_types.go b/api/v1alpha1/dsworker_types.go index 19f90b3..f142a41 100644 --- a/api/v1alpha1/dsworker_types.go +++ b/api/v1alpha1/dsworker_types.go @@ -68,9 +68,6 @@ type DSWorkerSpec struct { //LibPvcName define the address of lib pvc,the position is /opt/soft LibPvcName string `json:"lib_pvc_name,omitempty"` - - //AlertConfig is the config of alertService - AlertConfig *AlertConfig `json:"alert_config,omitempty"` } // DSWorkerStatus defines the observed state of DSWorker @@ -93,12 +90,18 @@ type DSWorkerStatus struct { // Members are the dsWorker members in the cluster Members MembersStatus `json:"members,omitempty"` + + // Selector must be the string form of the selector + Selector string `json:"selector"` } //+kubebuilder:object:root=true +//+kubebuilder:printcolumn:name="replicas",type="integer",JSONPath=".spec.replicas" +//+kubebuilder:printcolumn:name="repository",type="string",JSONPath=".spec.repository" +//+kubebuilder:printcolumn:name="version",type="string",JSONPath=".spec.version" +//+kubebuilder:printcolumn:name="controlPaused",type="boolean",JSONPath=".spec.controlPaused" //+kubebuilder:subresource:status - -// DSWorker is the Schema for the dsworkers API +//+kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas,selectorpath=.status.selector type DSWorker struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` @@ -108,7 +111,6 @@ type DSWorker struct { } //+kubebuilder:object:root=true - // DSWorkerList contains a list of DSWorker type DSWorkerList struct { metav1.TypeMeta `json:",inline"` @@ -119,8 +121,3 @@ type DSWorkerList struct { func init() { SchemeBuilder.Register(&DSWorker{}, &DSWorkerList{}) } - -type AlertConfig struct { - ServiceUrl string `json:"service_url,omitempty"` - Port string `json:"port,omitempty"` -} diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index f828d4d..99a89dc 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -26,21 +26,6 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *AlertConfig) DeepCopyInto(out *AlertConfig) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AlertConfig. -func (in *AlertConfig) DeepCopy() *AlertConfig { - if in == nil { - return nil - } - out := new(AlertConfig) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DSAlert) DeepCopyInto(out *DSAlert) { *out = *in @@ -321,6 +306,11 @@ func (in *DSMasterSpec) DeepCopyInto(out *DSMasterSpec) { *out = new(PodPolicy) (*in).DeepCopyInto(*out) } + if in.HpaPolicy != nil { + in, out := &in.HpaPolicy, &out.HpaPolicy + *out = new(HpaPolicy) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DSMasterSpec. @@ -426,11 +416,6 @@ func (in *DSWorkerSpec) DeepCopyInto(out *DSWorkerSpec) { *out = new(PodPolicy) (*in).DeepCopyInto(*out) } - if in.AlertConfig != nil { - in, out := &in.AlertConfig, &out.AlertConfig - *out = new(AlertConfig) - **out = **in - } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DSWorkerSpec. @@ -543,6 +528,21 @@ func (in *DsCondition) DeepCopy() *DsCondition { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *HpaPolicy) DeepCopyInto(out *HpaPolicy) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HpaPolicy. +func (in *HpaPolicy) DeepCopy() *HpaPolicy { + if in == nil { + return nil + } + out := new(HpaPolicy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MembersStatus) DeepCopyInto(out *MembersStatus) { *out = *in diff --git a/config/crd/bases/ds.apache.dolphinscheduler.dev_dsmasters.yaml b/config/crd/bases/ds.apache.dolphinscheduler.dev_dsmasters.yaml index 1823d52..a7b01be 100644 --- a/config/crd/bases/ds.apache.dolphinscheduler.dev_dsmasters.yaml +++ b/config/crd/bases/ds.apache.dolphinscheduler.dev_dsmasters.yaml @@ -51,6 +51,21 @@ spec: - url - username type: object + hpa: + properties: + cpu_average_utilization: + format: int32 + type: integer + max_replicas: + format: int32 + type: integer + mem_average_utilization: + format: int32 + type: integer + min_replicas: + format: int32 + type: integer + type: object log_pvc_name: description: LogPvcName defines the log capacity of application ,the position is /opt/dolphinscheduler/logs eg 20Gi diff --git a/config/crd/bases/ds.apache.dolphinscheduler.dev_dsworkers.yaml b/config/crd/bases/ds.apache.dolphinscheduler.dev_dsworkers.yaml index b3d53cb..b805afb 100644 --- a/config/crd/bases/ds.apache.dolphinscheduler.dev_dsworkers.yaml +++ b/config/crd/bases/ds.apache.dolphinscheduler.dev_dsworkers.yaml @@ -15,10 +15,22 @@ spec: singular: dsworker scope: Namespaced versions: - - name: v1alpha1 + - additionalPrinterColumns: + - jsonPath: .spec.replicas + name: replicas + type: integer + - jsonPath: .spec.repository + name: repository + type: string + - jsonPath: .spec.version + name: version + type: string + - jsonPath: .spec.controlPaused + name: controlPaused + type: boolean + name: v1alpha1 schema: openAPIV3Schema: - description: DSWorker is the Schema for the dsworkers API properties: apiVersion: description: 'APIVersion defines the versioned schema of this representation @@ -35,14 +47,6 @@ spec: spec: description: DSWorkerSpec defines the desired state of DSWorker properties: - alert_config: - description: AlertConfig is the config of alertService - properties: - port: - type: string - service_url: - type: string - type: object datasource: description: Datasource is the config of database properties: @@ -1419,11 +1423,20 @@ spec: default: 0 description: Replicas is the current size of the cluster type: integer + selector: + description: Selector must be the string form of the selector + type: string + required: + - selector type: object type: object served: true storage: true subresources: + scale: + labelSelectorPath: .status.selector + specReplicasPath: .spec.replicas + statusReplicasPath: .status.replicas status: {} status: acceptedNames: diff --git a/config/ds/postgreSQL/postgres-service.yaml b/config/ds/postgreSQL/postgres-service.yaml index ab7a73b..1af9894 100644 --- a/config/ds/postgreSQL/postgres-service.yaml +++ b/config/ds/postgreSQL/postgres-service.yaml @@ -6,10 +6,12 @@ metadata: app: postgres namespace: ds spec: - type: LoadBalancer + type: NodePort + #type: LoadBalancer ports: - port: 5432 targetPort: 5432 + nodePort: 30022 protocol: TCP selector: app: postgres \ No newline at end of file diff --git a/config/samples/ds_v1alpha1_dsalert.yaml b/config/samples/ds_v1alpha1_dsalert.yaml index 487d026..56c5647 100644 --- a/config/samples/ds_v1alpha1_dsalert.yaml +++ b/config/samples/ds_v1alpha1_dsalert.yaml @@ -11,6 +11,6 @@ spec: repository: apache/dolphinscheduler-alert-server datasource: drive_name: "org.postgresql.Driver" - url: "jdbc:postgresql://172.31.24.57:5432/dolphinscheduler" + url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler" username: "postgresadmin" password: "admin12345" diff --git a/config/samples/ds_v1alpha1_dsapi.yaml b/config/samples/ds_v1alpha1_dsapi.yaml index a330949..809c853 100644 --- a/config/samples/ds_v1alpha1_dsapi.yaml +++ b/config/samples/ds_v1alpha1_dsapi.yaml @@ -8,10 +8,10 @@ metadata: spec: replicas: 1 version: 3.0.0-alpha - zookeeper_connect: "172.31.29.53:2181" + zookeeper_connect: "172.17.0.5:2181" repository: apache/dolphinscheduler-api datasource: drive_name: "org.postgresql.Driver" - url: "jdbc:postgresql://172.31.24.57:5432/dolphinscheduler" + url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler" username: "postgresadmin" password: "admin12345" \ No newline at end of file diff --git a/config/samples/ds_v1alpha1_dsmaster.yaml b/config/samples/ds_v1alpha1_dsmaster.yaml index f4be4fb..8d0bbae 100644 --- a/config/samples/ds_v1alpha1_dsmaster.yaml +++ b/config/samples/ds_v1alpha1_dsmaster.yaml @@ -6,14 +6,18 @@ metadata: labels: app: ds-master spec: - replicas: 2 - zookeeper_connect: "172.31.29.53:2181" + replicas: 1 + zookeeper_connect: "172.17.0.5:2181" version: 3.0.0-alpha repository: apache/dolphinscheduler-master datasource: drive_name: "org.postgresql.Driver" - url: "jdbc:postgresql://172.31.24.57:5432/dolphinscheduler" + url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler" username: "postgresadmin" password: "admin12345" + hpa: + min_replicas: 1 + max_replicas: 5 + mem_average_utilization: 85 diff --git a/config/samples/ds_v1alpha1_dsworker.yaml b/config/samples/ds_v1alpha1_dsworker.yaml index 7f818b2..92065f0 100644 --- a/config/samples/ds_v1alpha1_dsworker.yaml +++ b/config/samples/ds_v1alpha1_dsworker.yaml @@ -6,12 +6,20 @@ metadata: labels: app: ds-worker spec: - replicas: 3 - zookeeper_connect: "172.31.29.53:2181" + replicas: 1 + zookeeper_connect: "172.17.0.5:2181" version: 3.0.0-alpha repository: apache/dolphinscheduler-worker datasource: drive_name: "org.postgresql.Driver" - url: "jdbc:postgresql://172.31.24.57:5432/dolphinscheduler" + url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler" username: "postgresadmin" - password: "admin12345" \ No newline at end of file + password: "admin12345" + pod: + resources: + limits: + cpu: "1000m" + memory: "2Gi" + requests: + cpu: "500m" + memory: "1Gi" diff --git a/controllers/dsalert_controller.go b/controllers/dsalert_controller.go index 2b6d88f..5f254ea 100644 --- a/controllers/dsalert_controller.go +++ b/controllers/dsalert_controller.go @@ -167,7 +167,7 @@ func (r *DSAlertReconciler) ensureAlertService(ctx context.Context, cluster *dsv namespacedName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsAlertServiceValue} if err := r.Client.Get(ctx, namespacedName, service); err != nil { // Local cache not found - logger.Info("get service error") + masterLogger.Info("get service error") if apierrors.IsNotFound(err) { service = createAlertService(cluster) if err := controllerutil.SetControllerReference(cluster, service, r.Scheme); err != nil { diff --git a/controllers/dsapi_controller.go b/controllers/dsapi_controller.go index 4e95511..7fbea79 100644 --- a/controllers/dsapi_controller.go +++ b/controllers/dsapi_controller.go @@ -50,7 +50,6 @@ type DSApiReconciler struct { // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. -// TODO(user): Modify the Reconcile function to compare the state specified by // the DSApi object against the actual cluster state, and then // perform operations to make the cluster state reflect the state specified by // the user. diff --git a/controllers/dsmaster_controller.go b/controllers/dsmaster_controller.go index df05da5..18edab7 100644 --- a/controllers/dsmaster_controller.go +++ b/controllers/dsmaster_controller.go @@ -18,6 +18,7 @@ package controllers import ( "context" + v2 "k8s.io/api/autoscaling/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -40,13 +41,8 @@ import ( dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1" ) -const ( - dsMasterLabel = "ds-master" - dsServiceLabel = "ds-operator-service" -) - var ( - logger = ctrl.Log.WithName("DSMaster-controller") + masterLogger = ctrl.Log.WithName("DSMaster-controller") ) // DSMasterReconciler reconciles a DSMaster object @@ -71,8 +67,8 @@ type DSMasterReconciler struct { // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger.Info("dmMaster start reconcile logic") - defer logger.Info("dmMaster Reconcile end ---------------------------------------------") + masterLogger.Info("dmMaster start reconcile logic") + defer masterLogger.Info("dmMaster Reconcile end ---------------------------------------------") cluster := &dsv1alpha1.DSMaster{} @@ -118,7 +114,7 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c // If dsmaster-cluster is paused, we do nothing on things changed. // Until dsmaster-cluster is un-paused, we will reconcile to the state of that point. if cluster.Spec.Paused { - logger.Info("ds-master control has been paused: ", "ds-master-name", cluster.Name) + masterLogger.Info("ds-master control has been paused: ", "ds-master-name", cluster.Name) desired.Status.ControlPaused = true if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil { return ctrl.Result{}, err @@ -130,32 +126,39 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c // 1. First time we see the ds-master-cluster, initialize it if cluster.Status.Phase == dsv1alpha1.DsPhaseNone { desired.Status.Phase = dsv1alpha1.DsPhaseCreating - logger.Info("phase had been changed from none ---> creating") + masterLogger.Info("phase had been changed from none ---> creating") err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster)) return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err } //2 ensure the headless service - logger.Info("Ensuring cluster service") + masterLogger.Info("Ensuring cluster service") if err := r.ensureMasterService(ctx, cluster); err != nil { return ctrl.Result{}, err } - // 3. Ensure bootstrapped, we will block here util cluster is up and healthy - logger.Info("Ensuring cluster members") + //2 ensure the headless service + masterLogger.Info("Ensuring worker hpa") + + if err := r.ensureHPA(ctx, cluster); err != nil { + return ctrl.Result{}, err + } + + // 4. Ensure bootstrapped, we will block here util cluster is up and healthy + masterLogger.Info("Ensuring cluster members") if requeue, err := r.ensureMembers(ctx, cluster); requeue { return ctrl.Result{RequeueAfter: 5 * time.Second}, err } - // 4. Ensure cluster scaled - logger.Info("Ensuring cluster scaled") + // 5. Ensure cluster scaled + masterLogger.Info("Ensuring cluster scaled") if requeue, err := r.ensureScaled(ctx, cluster); requeue { return ctrl.Result{Requeue: true, RequeueAfter: 5 * time.Second}, err } - // .5 Ensure cluster upgraded - logger.Info("Ensuring cluster upgraded") + // 6. Ensure cluster upgraded + masterLogger.Info("Ensuring cluster upgraded") if requeue, err := r.ensureUpgraded(ctx, cluster); requeue { return ctrl.Result{Requeue: true}, err } @@ -165,7 +168,7 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, err } - logger.Info("******************************************************") + masterLogger.Info("******************************************************") desired.Status.Phase = dsv1alpha1.DsPhaseNone if err := r.Update(ctx, desired); err != nil { return ctrl.Result{}, err @@ -184,6 +187,7 @@ func (r *DSMasterReconciler) SetupWithManager(mgr ctrl.Manager) error { For(&dsv1alpha1.DSMaster{}). Owns(&corev1.Pod{}). Owns(&corev1.Service{}). + Owns(&v2.HorizontalPodAutoscaler{}). Watches(&source.Channel{Source: r.resyncCh}, &handler.EnqueueRequestForObject{}). // or use WithEventFilter() WithEventFilter(filter). @@ -239,11 +243,11 @@ func (r *DSMasterReconciler) ensureScaled(ctx context.Context, cluster *dsv1alph } func (r *DSMasterReconciler) createMember(ctx context.Context, cluster *dsv1alpha1.DSMaster) error { - logger.Info("Starting add new member to cluster", "cluster", cluster.Name) - defer logger.Info("End add new member to cluster", "cluster", cluster.Name) + masterLogger.Info("Starting add new member to cluster", "cluster", cluster.Name) + defer masterLogger.Info("End add new member to cluster", "cluster", cluster.Name) // New Pod - pod, err := r.newDSMasterPod(ctx, cluster) + pod, err := r.newDSMasterPod(cluster) if err != nil { return err } @@ -256,7 +260,7 @@ func (r *DSMasterReconciler) createMember(ctx context.Context, cluster *dsv1alph } func (r *DSMasterReconciler) deletePod(ctx context.Context, pod *corev1.Pod) error { - logger.Info("begin delete pod", "pod name", pod.Name) + masterLogger.Info("begin delete pod", "pod name", pod.Name) if err := r.Client.Delete(ctx, pod); err != nil && !apierrors.IsNotFound(err) { return err } @@ -269,7 +273,7 @@ func (r *DSMasterReconciler) ensureUpgraded(ctx context.Context, cluster *dsv1al return false, err } - logger.Info("cluster.Spec.Version", "cluster.Spec.Version", cluster.Spec.Version) + masterLogger.Info("cluster.Spec.Version", "cluster.Spec.Version", cluster.Spec.Version) for _, memset := range ms { if r.predicateUpdate(memset, cluster) { pod := &corev1.Pod{} @@ -289,7 +293,7 @@ func getNeedUpgradePods(ctx context.Context, cli *kubernetes.Clientset, cluster if err != nil { return nil, err } - podAppSelect, err := labels.NewRequirement(dsv1alpha1.DsAppName, selection.Equals, []string{dsMasterLabel}) + podAppSelect, err := labels.NewRequirement(dsv1alpha1.DsAppName, selection.Equals, []string{dsv1alpha1.DsMasterLabel}) if err != nil { return nil, err } @@ -305,10 +309,10 @@ func (r *DSMasterReconciler) ensureMasterService(ctx context.Context, cluster *d // 1. Client service service := &corev1.Service{} - namespacedName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsServiceLabelValue} + namespacedName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsHeadLessServiceLabel} if err := r.Client.Get(ctx, namespacedName, service); err != nil { // Local cache not found - if apierrors.IsNotFound(err) { + if apierrors.IsNotFound(err) && !apierrors.IsAlreadyExists(err) { service = createMasterService(cluster) if err := controllerutil.SetControllerReference(cluster, service, r.Scheme); err != nil { return err @@ -317,6 +321,8 @@ func (r *DSMasterReconciler) ensureMasterService(ctx context.Context, cluster *d if err := r.Client.Create(ctx, service); err != nil { return err } + + r.recorder.Event(cluster, corev1.EventTypeNormal, "ds-operator-service created", "master headless service had been created") } } return nil @@ -391,3 +397,31 @@ func (r *Predicate) Generic(evt event.GenericEvent) bool { func (r *DSMasterReconciler) predicateUpdate(member *Member, cluster *dsv1alpha1.DSMaster) bool { return member.Version != cluster.Spec.Version } + +func (r *DSMasterReconciler) ensureHPA(ctx context.Context, cluster *dsv1alpha1.DSMaster) error { + hpa := &v2.HorizontalPodAutoscaler{} + namespacedName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsWorkerHpa} + if err := r.Client.Get(ctx, namespacedName, hpa); err != nil { + // Local cache not found + if apierrors.IsNotFound(err) && cluster.Spec.HpaPolicy != nil { + hpa := r.createHPA(cluster) + if err := controllerutil.SetControllerReference(cluster, hpa, r.Scheme); err != nil { + masterLogger.Info("set controller worker hpa error") + return err + } + // Remote may already exist, so we will return err, for the next time, this code will not execute + if err := r.Client.Create(ctx, hpa); err != nil { + masterLogger.Info("create worker hpa error") + return err + } + } + } + + if &hpa != nil && cluster.Spec.HpaPolicy == nil { + if err := r.deleteHPA(ctx, hpa); err != nil { + masterLogger.Info("delete hpa error") + return err + } + } + return nil +} diff --git a/controllers/dsworker_controller.go b/controllers/dsworker_controller.go index e16abc7..7a9262c 100644 --- a/controllers/dsworker_controller.go +++ b/controllers/dsworker_controller.go @@ -18,35 +18,30 @@ package controllers import ( "context" - "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "time" dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1" ) -const ( - dsWorkerLabel = "ds-worker" - dsWorkerConfig = "ds-worker-config" -) - // DSWorkerReconciler reconciles a DSWorker object type DSWorkerReconciler struct { client.Client - Log logr.Logger Scheme *runtime.Scheme - Recorder record.EventRecorder + recorder record.EventRecorder } var ( - worker_logger = ctrl.Log.WithName("DSWorker-controller") + workerLogger = ctrl.Log.WithName("DSWorker-controller") ) //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsworkers,verbs=get;list;watch;create;update;patch;delete @@ -62,14 +57,14 @@ var ( // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile func (r *DSWorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - worker_logger.Info("dmWorker start reconcile logic") - defer worker_logger.Info("dmWorker Reconcile end ---------------------------------------------") + workerLogger.Info("dmWorker start reconcile logic") + defer workerLogger.Info("dmWorker Reconcile end ---------------------------------------------") cluster := &dsv1alpha1.DSWorker{} if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil { if errors.IsNotFound(err) { - r.Recorder.Event(cluster, corev1.EventTypeWarning, "dmWorker is not Found", "dmWorker is not Found") + r.recorder.Event(cluster, corev1.EventTypeWarning, "dmWorker is not Found", "dmWorker is not Found") return ctrl.Result{}, nil } return ctrl.Result{}, err @@ -107,37 +102,45 @@ func (r *DSWorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c // If dsworker-cluster is paused, we do nothing on things changed. // Until dsworker-cluster is un-paused, we will reconcile to the dsworker state of that point. if cluster.Spec.Paused { - worker_logger.Info("ds-worker control has been paused: ", "ds-worker-name", cluster.Name) + workerLogger.Info("ds-worker control has been paused: ", "ds-worker-name", cluster.Name) desired.Status.ControlPaused = true if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil { return ctrl.Result{}, err } - r.Recorder.Event(cluster, corev1.EventTypeNormal, "the spec status is paused", "do nothing") + r.recorder.Event(cluster, corev1.EventTypeNormal, "the spec status is paused", "do nothing") return ctrl.Result{}, nil } - // 1. First time we see the ds-dsworker-cluster, initialize it + // 1. First time we see the ds-worker-cluster, initialize it if cluster.Status.Phase == dsv1alpha1.DsPhaseNone { + if desired.Status.Selector == "" { + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: LabelForWorkerPod()}) + if err != nil { + masterLogger.Error(err, "Error retrieving selector labels") + return reconcile.Result{}, err + } + desired.Status.Selector = selector.String() + } desired.Status.Phase = dsv1alpha1.DsPhaseCreating - worker_logger.Info("phase had been changed from none ---> creating") + workerLogger.Info("phase had been changed from none ---> creating") err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster)) return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err } // 3. Ensure bootstrapped, we will block here util cluster is up and healthy - worker_logger.Info("Ensuring cluster members") + workerLogger.Info("Ensuring cluster members") if requeue, err := r.ensureMembers(ctx, cluster); requeue { return ctrl.Result{RequeueAfter: 5 * time.Second}, err } // 4. Ensure cluster scaled - worker_logger.Info("Ensuring cluster scaled") + workerLogger.Info("Ensuring cluster scaled") if requeue, err := r.ensureScaled(ctx, cluster); requeue { return ctrl.Result{Requeue: true, RequeueAfter: 5 * time.Second}, err } // .5 Ensure cluster upgraded - worker_logger.Info("Ensuring cluster upgraded") + workerLogger.Info("Ensuring cluster upgraded") if requeue, err := r.ensureUpgraded(ctx, cluster); requeue { return ctrl.Result{Requeue: true}, err } @@ -147,13 +150,14 @@ func (r *DSWorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, err } - worker_logger.Info("******************************************************") + workerLogger.Info("******************************************************") desired.Status.Phase = dsv1alpha1.DsPhaseNone return ctrl.Result{Requeue: false}, nil } // SetupWithManager sets up the controller with the Manager. func (r *DSWorkerReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.recorder = mgr.GetEventRecorderFor("worker-controller") return ctrl.NewControllerManagedBy(mgr). For(&dsv1alpha1.DSWorker{}). Owns(&corev1.Pod{}). @@ -185,7 +189,7 @@ func (r *DSWorkerReconciler) ensureScaled(ctx context.Context, cluster *dsv1alph if len(ms) < cluster.Spec.Replicas { err = r.createMember(ctx, cluster) if err != nil { - r.Recorder.Event(cluster, corev1.EventTypeWarning, "cannot create the new ds-dsworker pod", "the ds-dsworker pod had been created failed") + r.recorder.Event(cluster, corev1.EventTypeWarning, "cannot create the new ds-dsworker pod", "the ds-dsworker pod had been created failed") return true, err } return true, err @@ -197,7 +201,7 @@ func (r *DSWorkerReconciler) ensureScaled(ctx context.Context, cluster *dsv1alph member := ms.PickOne() pod.SetName(member.Name) pod.SetNamespace(member.Namespace) - err = r.deleteMember(ctx, pod) + err = r.deleteMember(ctx, pod, cluster) if err != nil { return true, err } @@ -207,9 +211,37 @@ func (r *DSWorkerReconciler) ensureScaled(ctx context.Context, cluster *dsv1alph return false, nil } +func (r *DSWorkerReconciler) ensureUpgraded(ctx context.Context, cluster *dsv1alpha1.DSWorker) (bool, error) { + + ms, err := r.podMemberSet(ctx, cluster) + if err != nil { + return false, err + } + + for _, memset := range ms { + if memset.Version != cluster.Spec.Version { + pod := &corev1.Pod{} + pod.SetName(memset.Name) + pod.SetNamespace(memset.Namespace) + if err := r.deleteMember(ctx, pod, cluster); err != nil { + return false, err + } + return true, nil + } + } + return false, nil +} + +func (r *DSWorkerReconciler) ensureDSWorkerDeleted(ctx context.Context, cluster *dsv1alpha1.DSWorker) error { + if err := r.Client.Delete(ctx, cluster, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil { + return err + } + return nil +} + func (r *DSWorkerReconciler) createMember(ctx context.Context, cluster *dsv1alpha1.DSWorker) error { - worker_logger.Info("Starting add new member to cluster", "cluster", cluster.Name) - defer worker_logger.Info("End add new member to cluster", "cluster", cluster.Name) + workerLogger.Info("Starting add new member to cluster", "cluster", cluster.Name) + defer workerLogger.Info("End add new member to cluster", "cluster", cluster.Name) // New Pod pod, err := r.newDSWorkerPod(ctx, cluster) @@ -221,36 +253,26 @@ func (r *DSWorkerReconciler) createMember(ctx context.Context, cluster *dsv1alph if err = r.Client.Create(ctx, pod); err != nil && !apierrors.IsAlreadyExists(err) { return err } - return nil -} - -func (r *DSWorkerReconciler) deleteMember(ctx context.Context, pod *corev1.Pod) error { - worker_logger.Info("begin delete pod", "pod name", pod.Name) - if err := r.Client.Delete(ctx, pod); err != nil && !apierrors.IsNotFound(err) { + desired := cluster.DeepCopy() + desired.Spec.Replicas += 1 + if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil { return err } - return nil } -func (r *DSWorkerReconciler) ensureUpgraded(ctx context.Context, cluster *dsv1alpha1.DSWorker) (bool, error) { +func (r *DSWorkerReconciler) deleteMember(ctx context.Context, pod *corev1.Pod, cluster *dsv1alpha1.DSWorker) error { - ms, err := r.podMemberSet(ctx, cluster) - if err != nil { - return false, err + workerLogger.Info("begin delete pod", "pod name", pod.Name) + if err := r.Client.Delete(ctx, pod); err != nil && !apierrors.IsNotFound(err) { + return err } - for _, memset := range ms { - if memset.Version != cluster.Spec.Version { - pod := &corev1.Pod{} - pod.SetName(memset.Name) - pod.SetNamespace(memset.Namespace) - if err := r.deleteMember(ctx, pod); err != nil { - return false, err - } - return true, nil - } + desired := cluster.DeepCopy() + desired.Spec.Replicas -= 1 + if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil { + return err } - return false, nil + return nil } diff --git a/controllers/master_reconcile.go b/controllers/master_reconcile.go index 2c6e722..e480b7f 100644 --- a/controllers/master_reconcile.go +++ b/controllers/master_reconcile.go @@ -19,12 +19,12 @@ package controllers import ( "context" dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1" - "errors" + v2 "k8s.io/api/autoscaling/v2" + _ "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "strings" ) func (r *DSMasterReconciler) podMemberSet(ctx context.Context, cluster *dsv1alpha1.DSMaster) (MemberSet, error) { @@ -32,7 +32,7 @@ func (r *DSMasterReconciler) podMemberSet(ctx context.Context, cluster *dsv1alph pods := &corev1.PodList{} if err := r.Client.List(ctx, pods, client.InNamespace(cluster.Namespace), - client.MatchingLabels(LabelsForCluster(dsMasterLabel))); err != nil { + client.MatchingLabels(LabelsForCluster(dsv1alpha1.DsMasterLabel))); err != nil { return members, err } @@ -55,46 +55,6 @@ func (r *DSMasterReconciler) podMemberSet(ctx context.Context, cluster *dsv1alph return members, nil } -func (r *DSMasterReconciler) currentMemberSet(ctx context.Context, cluster *dsv1alpha1.DSMaster) (MemberSet, error) { - members := MemberSet{} - - // Normally will not happen - ms, ok := cluster.Annotations[dsv1alpha1.ClusterMembersAnnotation] - if !ok || ms == "" { - return members, errors.New("cluster spec has no members annotation") - } - - names := strings.Split(ms, ",") - - pods := &corev1.PodList{} - if err := r.Client.List(ctx, pods, client.InNamespace(cluster.Namespace), - client.MatchingLabels(LabelsForCluster(dsMasterLabel))); err != nil { - return members, err - } - - podMaps := map[string]corev1.Pod{} - for _, pod := range pods.Items { - podMaps[pod.Name] = pod - } - - for _, name := range names { - m := &Member{ - Name: name, - Namespace: cluster.Namespace, - Created: false, - RunningAndReady: false, - } - - if pod, ok := podMaps[name]; ok { - m.Created = true - m.RunningAndReady = IsRunningAndReady(&pod) - m.Version = pod.Labels[dsv1alpha1.DsVersionLabel] - } - members.Add(m) - } - return members, nil -} - func newDSMasterPod(cr *dsv1alpha1.DSMaster) *corev1.Pod { var isSetHostnameAsFQDN bool isSetHostnameAsFQDN = true @@ -103,7 +63,7 @@ func newDSMasterPod(cr *dsv1alpha1.DSMaster) *corev1.Pod { ObjectMeta: metav1.ObjectMeta{ Name: podName, Namespace: cr.Namespace, - Labels: map[string]string{dsv1alpha1.DsAppName: dsMasterLabel, + Labels: map[string]string{dsv1alpha1.DsAppName: dsv1alpha1.DsMasterLabel, dsv1alpha1.DsVersionLabel: cr.Spec.Version, dsv1alpha1.DsServiceLabel: dsv1alpha1.DsServiceLabelValue}, }, @@ -151,7 +111,7 @@ func (r *DSMasterReconciler) ensureDSMasterDeleted(ctx context.Context, DSMaster return nil } -func (r *DSMasterReconciler) newDSMasterPod(ctx context.Context, cluster *dsv1alpha1.DSMaster) (*corev1.Pod, error) { +func (r *DSMasterReconciler) newDSMasterPod(cluster *dsv1alpha1.DSMaster) (*corev1.Pod, error) { // Create pod pod := newDSMasterPod(cluster) if err := controllerutil.SetControllerReference(cluster, pod, r.Scheme); err != nil { @@ -166,10 +126,9 @@ func createMasterService(cluster *dsv1alpha1.DSMaster) *corev1.Service { labels_ := LabelsForService() service := corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: dsv1alpha1.DsServiceLabelValue, - GenerateName: dsv1alpha1.DsServiceLabelValue, - Namespace: cluster.Namespace, - Labels: map[string]string{dsv1alpha1.DsAppName: dsServiceLabel}, + Name: dsv1alpha1.DsHeadLessServiceLabel, + Namespace: cluster.Namespace, + Labels: map[string]string{dsv1alpha1.DsAppName: dsv1alpha1.DsHeadLessServiceLabel}, }, Spec: corev1.ServiceSpec{ Selector: labels_, @@ -179,3 +138,57 @@ func createMasterService(cluster *dsv1alpha1.DSMaster) *corev1.Service { } return &service } + +func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) *v2.HorizontalPodAutoscaler { + hpa := v2.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Name: dsv1alpha1.DsWorkerHpa, + Namespace: cluster.Namespace, + ResourceVersion: dsv1alpha1.DSVersion, + }, + Spec: v2.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: v2.CrossVersionObjectReference{ + Kind: dsv1alpha1.DsWorkerKind, + Name: dsv1alpha1.DsWorkerLabel, + APIVersion: dsv1alpha1.APIVersion, + }, + MinReplicas: &cluster.Spec.HpaPolicy.MinReplicas, + MaxReplicas: cluster.Spec.HpaPolicy.MaxReplicas, + }, + } + + if cluster.Spec.HpaPolicy.CPUAverageUtilization > 0 { + hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2.MetricSpec{ + Type: v2.ResourceMetricSourceType, + Resource: &v2.ResourceMetricSource{ + Name: corev1.ResourceCPU, + Target: v2.MetricTarget{ + Type: v2.UtilizationMetricType, + AverageUtilization: &cluster.Spec.HpaPolicy.CPUAverageUtilization, + }, + }, + }) + } + + if cluster.Spec.HpaPolicy.MEMAverageUtilization > 0 { + hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2.MetricSpec{ + Type: v2.ResourceMetricSourceType, + Resource: &v2.ResourceMetricSource{ + Name: corev1.ResourceMemory, + Target: v2.MetricTarget{ + Type: v2.UtilizationMetricType, + AverageUtilization: &cluster.Spec.HpaPolicy.MEMAverageUtilization, + }, + }, + }) + } + + return &hpa +} + +func (r *DSMasterReconciler) deleteHPA(ctx context.Context, hpa *v2.HorizontalPodAutoscaler) error { + if err := r.Client.Delete(ctx, hpa, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil { + return err + } + return nil +} diff --git a/controllers/pod.go b/controllers/pod.go index 35caf01..405c7ac 100644 --- a/controllers/pod.go +++ b/controllers/pod.go @@ -23,16 +23,6 @@ import ( "k8s.io/apimachinery/pkg/labels" ) -const ( - dsLogVolumeName = "ds-log" - dsLogVolumeMountDir = "/opt/dolphinscheduler/logs" - dsShareVolumeName = "ds-soft" - dsShareVolumeMountDir = "/opt/soft" -) - -type PodTemplate struct { -} - func applyPodPolicy(pod *corev1.Pod, policy *dsv1alpha1.PodPolicy) { if policy == nil { return @@ -53,6 +43,7 @@ func applyPodPolicy(pod *corev1.Pod, policy *dsv1alpha1.PodPolicy) { mergeLabels(pod.Labels, policy.Labels) if &policy.Resources != nil { + workerLogger.Info("the resources is ", "resources", policy.Resources) pod.Spec.Containers[0] = containerWithRequirements(pod.Spec.Containers[0], policy.Resources) } @@ -79,6 +70,10 @@ func PodWithNodeSelector(p *corev1.Pod, ns map[string]string) *corev1.Pod { return p } +func LabelForWorkerPod() map[string]string { + return LabelsForCluster(dsv1alpha1.DsWorkerLabel) +} + func LabelsForCluster(lbs string) map[string]string { return labels.Set{dsv1alpha1.DsAppName: lbs} } @@ -89,11 +84,11 @@ func LabelsForService() map[string]string { // AddLogVolumeToPod abstract the process of appending volume spec to pod spec func AddLogVolumeToPod(pod *corev1.Pod, pvcName string) { - vol := corev1.Volume{Name: dsLogVolumeName} + vol := corev1.Volume{Name: dsv1alpha1.DsLogVolumeName} vom := corev1.VolumeMount{ - Name: dsLogVolumeName, - MountPath: dsLogVolumeMountDir, + Name: dsv1alpha1.DsLogVolumeName, + MountPath: dsv1alpha1.DsLogVolumeMountDir, SubPath: pod.Name, } @@ -112,11 +107,11 @@ func AddLogVolumeToPod(pod *corev1.Pod, pvcName string) { // AddLibVolumeToPod abstract the process of appending volume /opt/soft spec to pod spec,it is shared by all worker nodes,and it is read only // Suggest to mount a share volume in production env directly func AddLibVolumeToPod(pod *corev1.Pod, pvcName string) { - vol := corev1.Volume{Name: dsShareVolumeName} + vol := corev1.Volume{Name: dsv1alpha1.DsShareVolumeName} vom := corev1.VolumeMount{ - Name: dsShareVolumeName, - MountPath: dsShareVolumeMountDir, + Name: dsv1alpha1.DsShareVolumeName, + MountPath: dsv1alpha1.DsShareVolumeMountDir, ReadOnly: true, } diff --git a/controllers/worker_reconcile.go b/controllers/worker_reconcile.go index 8ffe129..d0ce14e 100644 --- a/controllers/worker_reconcile.go +++ b/controllers/worker_reconcile.go @@ -19,12 +19,10 @@ package controllers import ( "context" dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1" - "errors" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "strings" ) func (r *DSWorkerReconciler) podMemberSet(ctx context.Context, cluster *dsv1alpha1.DSWorker) (MemberSet, error) { @@ -32,7 +30,7 @@ func (r *DSWorkerReconciler) podMemberSet(ctx context.Context, cluster *dsv1alph pods := &corev1.PodList{} if err := r.Client.List(ctx, pods, client.InNamespace(cluster.Namespace), - client.MatchingLabels(LabelsForCluster(dsWorkerLabel))); err != nil { + client.MatchingLabels(LabelsForCluster(dsv1alpha1.DsWorkerLabel))); err != nil { return members, err } @@ -55,53 +53,13 @@ func (r *DSWorkerReconciler) podMemberSet(ctx context.Context, cluster *dsv1alph return members, nil } -func (r *DSWorkerReconciler) currentMemberSet(ctx context.Context, cluster *dsv1alpha1.DSWorker) (MemberSet, error) { - members := MemberSet{} - - // Normally will not happen - ms, ok := cluster.Annotations[dsv1alpha1.ClusterMembersAnnotation] - if !ok || ms == "" { - return members, errors.New("cluster spec has no members annotation") - } - - names := strings.Split(ms, ",") - - pods := &corev1.PodList{} - if err := r.Client.List(ctx, pods, client.InNamespace(cluster.Namespace), - client.MatchingLabels(LabelsForCluster(dsWorkerLabel))); err != nil { - return members, err - } - - podMaps := map[string]corev1.Pod{} - for _, pod := range pods.Items { - podMaps[pod.Name] = pod - } - - for _, name := range names { - m := &Member{ - Name: name, - Namespace: cluster.Namespace, - Created: false, - RunningAndReady: false, - } - - if pod, ok := podMaps[name]; ok { - m.Created = true - m.RunningAndReady = IsRunningAndReady(&pod) - m.Version = pod.Labels[dsv1alpha1.DsVersionLabel] - } - members.Add(m) - } - return members, nil -} - func newDSWorkerPod(cr *dsv1alpha1.DSWorker) *corev1.Pod { var podName = cr.Name + "-pod" + dsv1alpha1.RandStr(6) return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: podName, Namespace: cr.Namespace, - Labels: map[string]string{dsv1alpha1.DsAppName: dsWorkerLabel, + Labels: map[string]string{dsv1alpha1.DsAppName: dsv1alpha1.DsWorkerLabel, dsv1alpha1.DsVersionLabel: cr.Spec.Version, dsv1alpha1.DsServiceLabel: dsv1alpha1.DsServiceLabelValue, }, @@ -146,13 +104,6 @@ func newDSWorkerPod(cr *dsv1alpha1.DSWorker) *corev1.Pod { } } -func (r *DSWorkerReconciler) ensureDSWorkerDeleted(ctx context.Context, dsWorker *dsv1alpha1.DSWorker) error { - if err := r.Client.Delete(ctx, dsWorker, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil { - return err - } - return nil -} - func (r *DSWorkerReconciler) newDSWorkerPod(ctx context.Context, cluster *dsv1alpha1.DSWorker) (*corev1.Pod, error) { // Create pod pod := newDSWorkerPod(cluster)
