This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-operator.git
commit 2bcaee13795c166b3403ad8ba6b30dd2fc226725 Author: nobolity <[email protected]> AuthorDate: Tue Jun 14 14:27:56 2022 +0800 fix(all): merge the version of hpa and update the type of deleteResource --- config/manager/kustomization.yaml | 4 +-- config/rbac/role.yaml | 22 ++++++++++++ config/samples/ds_v1alpha1_dsapi.yaml | 1 + config/samples/ds_v1alpha1_dsmaster.yaml | 8 ++--- controllers/dsalert_controller.go | 3 +- controllers/dsapi_controller.go | 3 +- controllers/dsmaster_controller.go | 59 ++++++++++++++++---------------- controllers/dsworker_controller.go | 25 +++++++++++--- controllers/master_reconcile.go | 34 +++++++++--------- main.go | 8 +++-- 10 files changed, 106 insertions(+), 61 deletions(-) diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index dd1052d..c2c4746 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -12,5 +12,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: controller - newName: kezhenxu94/controller - newTag: latest + newName: nobolity/ds-operator + newTag: v1alpha1 diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index b8bf865..35f12b4 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -5,6 +5,16 @@ metadata: creationTimestamp: null name: manager-role rules: +- apiGroups: + - "" + resources: + - persistentvolumeclaims + verbs: + - create + - delete + - get + - list + - watch - apiGroups: - "" resources: @@ -41,6 +51,18 @@ rules: - patch - update - watch +- apiGroups: + - autoscaling + resources: + - horizontalpodautoscalers + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - ds.apache.dolphinscheduler.dev resources: diff --git a/config/samples/ds_v1alpha1_dsapi.yaml b/config/samples/ds_v1alpha1_dsapi.yaml index 809c853..fef6f69 100644 --- a/config/samples/ds_v1alpha1_dsapi.yaml +++ b/config/samples/ds_v1alpha1_dsapi.yaml @@ -10,6 +10,7 @@ spec: version: 3.0.0-alpha zookeeper_connect: "172.17.0.5:2181" repository: apache/dolphinscheduler-api + node_port: 30002 datasource: drive_name: "org.postgresql.Driver" url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler" diff --git a/config/samples/ds_v1alpha1_dsmaster.yaml b/config/samples/ds_v1alpha1_dsmaster.yaml index 8d0bbae..de71b27 100644 --- a/config/samples/ds_v1alpha1_dsmaster.yaml +++ b/config/samples/ds_v1alpha1_dsmaster.yaml @@ -15,9 +15,9 @@ spec: url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler" username: "postgresadmin" password: "admin12345" - hpa: - min_replicas: 1 - max_replicas: 5 - mem_average_utilization: 85 +# hpa: +# min_replicas: 1 +# max_replicas: 5 +# mem_average_utilization: 85 diff --git a/controllers/dsalert_controller.go b/controllers/dsalert_controller.go index f3179a0..4a57bdc 100644 --- a/controllers/dsalert_controller.go +++ b/controllers/dsalert_controller.go @@ -150,6 +150,7 @@ func (r *DSAlertReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct // SetupWithManager sets up the controller with the Manager. func (r *DSAlertReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.Recorder = mgr.GetEventRecorderFor("alert-controller") return ctrl.NewControllerManagedBy(mgr). For(&dsv1alpha1.DSAlert{}). Owns(&v1.Deployment{}). @@ -158,7 +159,7 @@ func (r *DSAlertReconciler) SetupWithManager(mgr ctrl.Manager) error { } func (r *DSAlertReconciler) ensureDSAlertDeleted(ctx context.Context, DSAlert *dsv1alpha1.DSAlert) error { - if err := r.Client.Delete(ctx, DSAlert, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil { + if err := r.Client.Delete(ctx, DSAlert, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil { return err } return nil diff --git a/controllers/dsapi_controller.go b/controllers/dsapi_controller.go index 3612326..939ebd1 100644 --- a/controllers/dsapi_controller.go +++ b/controllers/dsapi_controller.go @@ -148,6 +148,7 @@ func (r *DSApiReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl // SetupWithManager sets up the controller with the Manager. func (r *DSApiReconciler) SetupWithManager(mgr ctrl.Manager) error { + r.Recorder = mgr.GetEventRecorderFor("api-controller") return ctrl.NewControllerManagedBy(mgr). For(&dsv1alpha1.DSApi{}). Owns(&v1.Deployment{}). @@ -156,7 +157,7 @@ func (r *DSApiReconciler) SetupWithManager(mgr ctrl.Manager) error { } func (r *DSApiReconciler) ensureDSApiDeleted(ctx context.Context, DSApi *dsv1alpha1.DSApi) error { - if err := r.Client.Delete(ctx, DSApi, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil { + if err := r.Client.Delete(ctx, DSApi, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil { return err } return nil diff --git a/controllers/dsmaster_controller.go b/controllers/dsmaster_controller.go index 01ca5b7..c5013fa 100644 --- a/controllers/dsmaster_controller.go +++ b/controllers/dsmaster_controller.go @@ -18,12 +18,11 @@ package controllers import ( "context" - "sync" + "k8s.io/api/autoscaling/v2beta2" "time" - v2 "k8s.io/api/autoscaling/v2" + dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -36,10 +35,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/source" - - dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1" ) var ( @@ -51,18 +46,14 @@ type DSMasterReconciler struct { client.Client Scheme *runtime.Scheme recorder record.EventRecorder - clusters sync.Map - resyncCh chan event.GenericEvent } //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsmasters,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsmasters/status,verbs=get;update;patch //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsmasters/finalizers,verbs=update -// +kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;create;delete;list;watch +//+kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;create;delete;list;watch //+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete @@ -81,11 +72,7 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c cluster := &dsv1alpha1.DSMaster{} if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil { - if errors.IsNotFound(err) { - r.recorder.Event(cluster, corev1.EventTypeWarning, "dsMaster is not Found", "dsMaster is not Found") - return ctrl.Result{}, nil - } - return ctrl.Result{}, err + return ctrl.Result{}, client.IgnoreNotFound(err) } desired := cluster.DeepCopy() @@ -125,9 +112,14 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c masterLogger.Info("ds-master control has been paused: ", "ds-master-name", cluster.Name) desired.Status.ControlPaused = true if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil { - return ctrl.Result{}, err + if apierrors.IsConflict(err) { + return ctrl.Result{Requeue: true}, nil + } else { + masterLogger.Error(err, "unexpected error when master update status in paused") + return ctrl.Result{}, err + } } - r.recorder.Event(cluster, corev1.EventTypeNormal, "the spec status is paused", "do nothing") + r.recorder.Event(cluster, corev1.EventTypeNormal, "the master spec status is paused", "do nothing") return ctrl.Result{}, nil } @@ -135,8 +127,15 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c if cluster.Status.Phase == dsv1alpha1.DsPhaseNone { desired.Status.Phase = dsv1alpha1.DsPhaseCreating masterLogger.Info("phase had been changed from none ---> creating") - err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster)) - return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err + if err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil { + + if apierrors.IsConflict(err) { + return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err + } else { + masterLogger.Error(err, "unexpected error when master update status in creating") + return ctrl.Result{}, err + } + } } //2 ensure the headless service @@ -173,7 +172,12 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c desired.Status.Phase = dsv1alpha1.DsPhaseFinished if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil { - return ctrl.Result{}, err + if apierrors.IsConflict(err) { + return ctrl.Result{Requeue: true}, nil + } else { + masterLogger.Error(err, "unexpected error when master update status in finished") + return ctrl.Result{}, err + } } masterLogger.Info("******************************************************") @@ -186,8 +190,6 @@ func (r *DSMasterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c // SetupWithManager sets up the controller with the Manager. func (r *DSMasterReconciler) SetupWithManager(mgr ctrl.Manager) error { - r.clusters = sync.Map{} - r.resyncCh = make(chan event.GenericEvent) r.recorder = mgr.GetEventRecorderFor("master-controller") filter := &Predicate{} @@ -195,8 +197,7 @@ func (r *DSMasterReconciler) SetupWithManager(mgr ctrl.Manager) error { For(&dsv1alpha1.DSMaster{}). Owns(&corev1.Pod{}). Owns(&corev1.Service{}). - Owns(&v2.HorizontalPodAutoscaler{}). - Watches(&source.Channel{Source: r.resyncCh}, &handler.EnqueueRequestForObject{}). + Owns(&v2beta2.HorizontalPodAutoscaler{}). // or use WithEventFilter() WithEventFilter(filter). Complete(r) @@ -407,7 +408,7 @@ func (r *DSMasterReconciler) predicateUpdate(member *Member, cluster *dsv1alpha1 } func (r *DSMasterReconciler) ensureHPA(ctx context.Context, cluster *dsv1alpha1.DSMaster) error { - hpa := &v2.HorizontalPodAutoscaler{} + hpa := &v2beta2.HorizontalPodAutoscaler{} namespacedName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsWorkerHpa} if err := r.Client.Get(ctx, namespacedName, hpa); err != nil { // Local cache not found @@ -425,7 +426,7 @@ func (r *DSMasterReconciler) ensureHPA(ctx context.Context, cluster *dsv1alpha1. } } - if &hpa != nil && cluster.Spec.HpaPolicy == nil { + if hpa.Kind != "" && cluster.Spec.HpaPolicy == nil { if err := r.deleteHPA(ctx, hpa); err != nil { masterLogger.Info("delete hpa error") return err diff --git a/controllers/dsworker_controller.go b/controllers/dsworker_controller.go index dc3ddeb..3d6dafb 100644 --- a/controllers/dsworker_controller.go +++ b/controllers/dsworker_controller.go @@ -49,6 +49,7 @@ var ( //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsworkers/status,verbs=get;update;patch //+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsworkers/finalizers,verbs=update //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;create;delete;list;watch //+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete @@ -109,7 +110,12 @@ func (r *DSWorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c workerLogger.Info("ds-worker control has been paused: ", "ds-worker-name", cluster.Name) desired.Status.ControlPaused = true if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil { - return ctrl.Result{}, err + if apierrors.IsConflict(err) { + return ctrl.Result{Requeue: true}, nil + } else { + masterLogger.Error(err, "unexpected error when worker update status in paused") + return ctrl.Result{}, err + } } r.recorder.Event(cluster, corev1.EventTypeNormal, "the spec status is paused", "do nothing") return ctrl.Result{}, nil @@ -127,8 +133,14 @@ func (r *DSWorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c } desired.Status.Phase = dsv1alpha1.DsPhaseCreating workerLogger.Info("phase had been changed from none ---> creating") - err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster)) - return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err + if err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil { + if apierrors.IsConflict(err) { + return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err + } else { + masterLogger.Error(err, "unexpected error when worker update status in creating") + return ctrl.Result{}, err + } + } } // 3. Ensure bootstrapped, we will block here util cluster is up and healthy @@ -151,7 +163,12 @@ func (r *DSWorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c desired.Status.Phase = dsv1alpha1.DsPhaseFinished if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil { - return ctrl.Result{}, err + if apierrors.IsConflict(err) { + return ctrl.Result{Requeue: true}, nil + } else { + masterLogger.Error(err, "unexpected error when worker update status in finished") + return ctrl.Result{}, err + } } workerLogger.Info("******************************************************") diff --git a/controllers/master_reconcile.go b/controllers/master_reconcile.go index 7a3fb4d..f11a464 100644 --- a/controllers/master_reconcile.go +++ b/controllers/master_reconcile.go @@ -19,8 +19,8 @@ package controllers import ( "context" dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1" + "k8s.io/api/autoscaling/v2beta2" - v2 "k8s.io/api/autoscaling/v2" _ "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -106,7 +106,7 @@ func newDSMasterPod(cr *dsv1alpha1.DSMaster) *corev1.Pod { } func (r *DSMasterReconciler) ensureDSMasterDeleted(ctx context.Context, DSMaster *dsv1alpha1.DSMaster) error { - if err := r.Client.Delete(ctx, DSMaster, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil { + if err := r.Client.Delete(ctx, DSMaster, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil { return err } return nil @@ -140,15 +140,15 @@ func createMasterService(cluster *dsv1alpha1.DSMaster) *corev1.Service { return &service } -func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) *v2.HorizontalPodAutoscaler { - hpa := v2.HorizontalPodAutoscaler{ +func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) *v2beta2.HorizontalPodAutoscaler { + hpa := v2beta2.HorizontalPodAutoscaler{ ObjectMeta: metav1.ObjectMeta{ Name: dsv1alpha1.DsWorkerHpa, Namespace: cluster.Namespace, ResourceVersion: dsv1alpha1.DSVersion, }, - Spec: v2.HorizontalPodAutoscalerSpec{ - ScaleTargetRef: v2.CrossVersionObjectReference{ + Spec: v2beta2.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: v2beta2.CrossVersionObjectReference{ Kind: dsv1alpha1.DsWorkerKind, Name: dsv1alpha1.DsWorkerLabel, APIVersion: dsv1alpha1.APIVersion, @@ -159,12 +159,12 @@ func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) *v2.Horizon } if cluster.Spec.HpaPolicy.CPUAverageUtilization > 0 { - hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2.MetricSpec{ - Type: v2.ResourceMetricSourceType, - Resource: &v2.ResourceMetricSource{ + hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2beta2.MetricSpec{ + Type: v2beta2.ResourceMetricSourceType, + Resource: &v2beta2.ResourceMetricSource{ Name: corev1.ResourceCPU, - Target: v2.MetricTarget{ - Type: v2.UtilizationMetricType, + Target: v2beta2.MetricTarget{ + Type: v2beta2.UtilizationMetricType, AverageUtilization: &cluster.Spec.HpaPolicy.CPUAverageUtilization, }, }, @@ -172,12 +172,12 @@ func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) *v2.Horizon } if cluster.Spec.HpaPolicy.MEMAverageUtilization > 0 { - hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2.MetricSpec{ - Type: v2.ResourceMetricSourceType, - Resource: &v2.ResourceMetricSource{ + hpa.Spec.Metrics = append(hpa.Spec.Metrics, v2beta2.MetricSpec{ + Type: v2beta2.ResourceMetricSourceType, + Resource: &v2beta2.ResourceMetricSource{ Name: corev1.ResourceMemory, - Target: v2.MetricTarget{ - Type: v2.UtilizationMetricType, + Target: v2beta2.MetricTarget{ + Type: v2beta2.UtilizationMetricType, AverageUtilization: &cluster.Spec.HpaPolicy.MEMAverageUtilization, }, }, @@ -187,7 +187,7 @@ func (r *DSMasterReconciler) createHPA(cluster *dsv1alpha1.DSMaster) *v2.Horizon return &hpa } -func (r *DSMasterReconciler) deleteHPA(ctx context.Context, hpa *v2.HorizontalPodAutoscaler) error { +func (r *DSMasterReconciler) deleteHPA(ctx context.Context, hpa *v2beta2.HorizontalPodAutoscaler) error { if err := r.Client.Delete(ctx, hpa, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil { return err } diff --git a/main.go b/main.go index 1661e0c..d7f2398 100644 --- a/main.go +++ b/main.go @@ -106,9 +106,11 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "DSApi") os.Exit(1) } - if err = (&dsv1alpha1.DSMaster{}).SetupWebhookWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create webhook", "webhook", "DSMaster") - os.Exit(1) + if os.Getenv("ENABLE_WEBHOOKS") != "false" { + if err = (&dsv1alpha1.DSMaster{}).SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "DSMaster") + os.Exit(1) + } } //+kubebuilder:scaffold:builder
