This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-operator.git
commit c87b0c0a0e64815fa4706845d7db0091733b816e Author: nobolity <[email protected]> AuthorDate: Sun May 29 21:52:19 2022 +0800 feat(operator): add api --- api/v1alpha1/dsapi_types.go | 120 +++++++++++ .../ds.apache.dolphinscheduler.dev_dsapis.yaml | 56 ++++++ config/crd/patches/cainjection_in_dsapis.yaml | 7 + config/crd/patches/webhook_in_dsapis.yaml | 16 ++ config/rbac/dsapi_editor_role.yaml | 24 +++ config/rbac/dsapi_viewer_role.yaml | 20 ++ config/samples/ds_v1alpha1_dsapi.yaml | 17 ++ controllers/dsapi_controller.go | 221 +++++++++++++++++++++ 8 files changed, 481 insertions(+) diff --git a/api/v1alpha1/dsapi_types.go b/api/v1alpha1/dsapi_types.go new file mode 100644 index 0000000..01f11f9 --- /dev/null +++ b/api/v1alpha1/dsapi_types.go @@ -0,0 +1,120 @@ +/* +Copyright 2022. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! +// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. + +// DSApiSpec defines the desired state of DSApi +type DSApiSpec struct { + Datasource *DateSourceTemplate `json:"datasource"` + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster + // Important: Run "make" to regenerate code after modifying this file + + // Version is the expected version of the ds cluster. + // The ds-operator will eventually make the ds cluster version + // equal to the expected version. + // If version is not set, default is "3.0.0-alpha". + // +kubebuilder:default="3.0.0-alpha" + Version string `json:"version,omitempty"` + + // Repository is the name of the repository that hosts + // ds container images. It should be direct clone of the repository in official + // By default, it is `apache/dolphinscheduler-master`. + // +kubebuilder:default=apache/dolphinscheduler-master + Repository string `json:"repository,omitempty"` + + // Replicas is the expected size of the ms-master. + // The ds-master-operator will eventually make the size of the running + // equal to the expected size. + // The vaild range of the size is from 1 to 7. + // +kubebuilder:default=3 + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Maximum=7 + Replicas int `json:"replicas"` + + // Pod defines the policy to create pod for the dm-master pod. + // Updating Pod does not take effect on any existing dm-master pods. + Pod *PodPolicy `json:"pod,omitempty"` + + // Paused is to pause the control of the operator for the ds-master . + // +kubebuilder:default=false + Paused bool `json:"paused,omitempty"` + + //LogPvcName defines the log capacity of application ,the position is /opt/dolphinscheduler/logs eg 20Gi + LogPvcName string `json:"log_pvc_name,omitempty"` + + //ReGenerate defines if delete the old_deployment and create a new deployment + // +kubebuilder:default=false + ReGenerate bool `json:"re_generate,omitempty"` + + //NodePort is the port node exposed + NodePort int32 `json:"node_port"` +} + +// DSApiStatus defines the observed state of DSApi +type DSApiStatus struct { + // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster + // Important: Run "make" to regenerate code after modifying this file + // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster + // Important: Run "make" to regenerate code after modifying this file + // Phase is the cluster running phase + // +kubebuilder:validation:Enum="";Creating;Running;Failed;Finished + Phase DsPhase `json:"phase,omitempty"` + // ControlPaused indicates the operator pauses the control of the cluster. + // +kubebuilder:default=false + ControlPaused bool `json:"controlPaused,omitempty"` + + // Condition keeps track of all cluster conditions, if they exist. + Conditions []DsCondition `json:"conditions,omitempty"` + + // Replicas is the current size of the cluster + // +kubebuilder:default=0 + Replicas int `json:"replicas,omitempty"` + + // Members are the dsMaster members in the cluster + Members MembersStatus `json:"members,omitempty"` +} + +//+kubebuilder:object:root=true +//+kubebuilder:subresource:status + +// DSApi is the Schema for the dsapis API +type DSApi struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec DSApiSpec `json:"spec,omitempty"` + Status DSApiStatus `json:"status,omitempty"` +} + +//+kubebuilder:object:root=true + +// DSApiList contains a list of DSApi +type DSApiList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []DSApi `json:"items"` +} + +func init() { + SchemeBuilder.Register(&DSApi{}, &DSApiList{}) +} diff --git a/config/crd/bases/ds.apache.dolphinscheduler.dev_dsapis.yaml b/config/crd/bases/ds.apache.dolphinscheduler.dev_dsapis.yaml new file mode 100644 index 0000000..5f98244 --- /dev/null +++ b/config/crd/bases/ds.apache.dolphinscheduler.dev_dsapis.yaml @@ -0,0 +1,56 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.8.0 + creationTimestamp: null + name: dsapis.ds.apache.dolphinscheduler.dev +spec: + group: ds.apache.dolphinscheduler.dev + names: + kind: DSApi + listKind: DSApiList + plural: dsapis + singular: dsapi + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: DSApi is the Schema for the dsapis API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: DSApiSpec defines the desired state of DSApi + properties: + foo: + description: Foo is an example field of DSApi. Edit dsapi_types.go + to remove/update + type: string + type: object + status: + description: DSApiStatus defines the observed state of DSApi + type: object + type: object + served: true + storage: true + subresources: + status: {} +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] diff --git a/config/crd/patches/cainjection_in_dsapis.yaml b/config/crd/patches/cainjection_in_dsapis.yaml new file mode 100644 index 0000000..c11123f --- /dev/null +++ b/config/crd/patches/cainjection_in_dsapis.yaml @@ -0,0 +1,7 @@ +# The following patch adds a directive for certmanager to inject CA into the CRD +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) + name: dsapis.ds.apache.dolphinscheduler.dev diff --git a/config/crd/patches/webhook_in_dsapis.yaml b/config/crd/patches/webhook_in_dsapis.yaml new file mode 100644 index 0000000..937789c --- /dev/null +++ b/config/crd/patches/webhook_in_dsapis.yaml @@ -0,0 +1,16 @@ +# The following patch enables a conversion webhook for the CRD +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: dsapis.ds.apache.dolphinscheduler.dev +spec: + conversion: + strategy: Webhook + webhook: + clientConfig: + service: + namespace: system + name: webhook-service + path: /convert + conversionReviewVersions: + - v1 diff --git a/config/rbac/dsapi_editor_role.yaml b/config/rbac/dsapi_editor_role.yaml new file mode 100644 index 0000000..c7c0e68 --- /dev/null +++ b/config/rbac/dsapi_editor_role.yaml @@ -0,0 +1,24 @@ +# permissions for end users to edit dsapis. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: dsapi-editor-role +rules: +- apiGroups: + - ds.apache.dolphinscheduler.dev + resources: + - dsapis + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - ds.apache.dolphinscheduler.dev + resources: + - dsapis/status + verbs: + - get diff --git a/config/rbac/dsapi_viewer_role.yaml b/config/rbac/dsapi_viewer_role.yaml new file mode 100644 index 0000000..acf5226 --- /dev/null +++ b/config/rbac/dsapi_viewer_role.yaml @@ -0,0 +1,20 @@ +# permissions for end users to view dsapis. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: dsapi-viewer-role +rules: +- apiGroups: + - ds.apache.dolphinscheduler.dev + resources: + - dsapis + verbs: + - get + - list + - watch +- apiGroups: + - ds.apache.dolphinscheduler.dev + resources: + - dsapis/status + verbs: + - get diff --git a/config/samples/ds_v1alpha1_dsapi.yaml b/config/samples/ds_v1alpha1_dsapi.yaml new file mode 100644 index 0000000..e3264d7 --- /dev/null +++ b/config/samples/ds_v1alpha1_dsapi.yaml @@ -0,0 +1,17 @@ +apiVersion: ds.apache.dolphinscheduler.dev/v1alpha1 +kind: DSApi +metadata: + name: ds-api + namespace: ds + labels: + app: ds-api +spec: + replicas: 1 + version: 3.0.0-alpha + repository: apache/dolphinscheduler-api + node_port:30002 + datasource: + drive_name: "org.postgresql.Driver" + url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler" + username: "postgresadmin" + password: "admin12345" \ No newline at end of file diff --git a/controllers/dsapi_controller.go b/controllers/dsapi_controller.go new file mode 100644 index 0000000..8d74913 --- /dev/null +++ b/controllers/dsapi_controller.go @@ -0,0 +1,221 @@ +/* +Copyright 2022. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1" + v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "time" +) + +var ( + apiLogger = ctrl.Log.WithName("DSApi-controller") +) + +// DSApiReconciler reconciles a DSApi object +type DSApiReconciler struct { + client.Client + Scheme *runtime.Scheme + Recorder record.EventRecorder +} + +//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsapis,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsapis/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsapis/finalizers,verbs=update + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// TODO(user): Modify the Reconcile function to compare the state specified by +// the DSApi object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile +func (r *DSApiReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + apiLogger.Info("dmApi start reconcile logic") + defer apiLogger.Info("dmApi Reconcile end ---------------------------------------------") + + cluster := &dsv1alpha1.DSApi{} + + if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil { + if errors.IsNotFound(err) { + r.Recorder.Event(cluster, corev1.EventTypeWarning, "dsApi is not Found", "dsApi is not Found") + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + desired := cluster.DeepCopy() + + // Handler finalizer + // examine DeletionTimestamp to determine if object is under deletion + if cluster.ObjectMeta.DeletionTimestamp.IsZero() { + // The object is not being deleted, so if it does not have our finalizer, + // then lets add the finalizer and update the object. This is equivalent + // registering our finalizer. + if !controllerutil.ContainsFinalizer(desired, dsv1alpha1.FinalizerName) { + controllerutil.AddFinalizer(desired, dsv1alpha1.FinalizerName) + if err := r.Update(ctx, desired); err != nil { + return ctrl.Result{}, err + } + } + } else { + // The object is being deleted + + if controllerutil.ContainsFinalizer(desired, dsv1alpha1.FinalizerName) { + // our finalizer is present, so lets handle any external dependency + if err := r.ensureDSApiDeleted(ctx, cluster); err != nil { + return ctrl.Result{}, err + } + + // remove our finalizer from the list and update it. + controllerutil.RemoveFinalizer(desired, dsv1alpha1.FinalizerName) + if err := r.Update(ctx, desired); err != nil { + return ctrl.Result{}, err + } + } + // Stop reconciliation as the item is being deleted + return ctrl.Result{}, nil + } + + if cluster.Spec.Paused { + apiLogger.Info("ds-Api control has been paused: ", "ds-Api-name", cluster.Name) + desired.Status.ControlPaused = true + if err := r.Status().Patch(ctx, desired, client.MergeFrom(cluster)); err != nil { + return ctrl.Result{}, err + } + r.Recorder.Event(cluster, corev1.EventTypeNormal, "the spec status is paused", "do nothing") + return ctrl.Result{}, nil + } + + // 1. First time we see the ds-master-cluster, initialize it + if cluster.Status.Phase == dsv1alpha1.DsPhaseNone { + desired.Status.Phase = dsv1alpha1.DsPhaseCreating + apiLogger.Info("phase had been changed from none ---> creating") + err := r.Client.Status().Patch(ctx, desired, client.MergeFrom(cluster)) + return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err + } + + //2 ensure the Api service + apiLogger.Info("Ensuring Api service") + + if err := r.ensureApiService(ctx, cluster); err != nil { + return ctrl.Result{Requeue: true}, nil + } + + if requeue, err := r.ensureApiDeployment(ctx, cluster); err != nil { + return ctrl.Result{Requeue: false}, err + } else { + if !requeue { + return ctrl.Result{Requeue: false}, nil + } + } + + apiLogger.Info("******************************************************") + desired.Status.Phase = dsv1alpha1.DsPhaseNone + if err := r.Update(ctx, desired); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{Requeue: false}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *DSApiReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&dsv1alpha1.DSApi{}). + Owns(&v1.Deployment{}). + Owns(&corev1.Service{}). + Owns(&corev1.Pod{}). + Complete(r) +} + +func (r *DSApiReconciler) ensureDSApiDeleted(ctx context.Context, DSApi *dsv1alpha1.DSApi) error { + if err := r.Client.Delete(ctx, DSApi, client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil { + return err + } + return nil +} + +func (r *DSApiReconciler) ensureApiService(ctx context.Context, cluster *dsv1alpha1.DSApi) error { + // 1. Client service + service := &corev1.Service{} + namespacedName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsApiServiceValue} + if err := r.Client.Get(ctx, namespacedName, service); err != nil { + // Local cache not found + logger.Info("get service error") + if apierrors.IsNotFound(err) { + service = createApiService(cluster) + if err := controllerutil.SetControllerReference(cluster, service, r.Scheme); err != nil { + logger.Info("create Api service error") + return err + } + // Remote may already exist, so we will return err, for the next time, this code will not execute + if err := r.Client.Create(ctx, service); err != nil { + logger.Info("create Api service error1") + return err + } + logger.Info("the Api service had been created") + } + } + return nil +} + +func (r *DSApiReconciler) ensureApiDeployment(ctx context.Context, cluster *dsv1alpha1.DSApi) (bool, error) { + deployment := &v1.Deployment{} + deploymentNamespaceName := types.NamespacedName{Namespace: cluster.Namespace, Name: dsv1alpha1.DsApiDeploymentValue} + if err := r.Client.Get(ctx, deploymentNamespaceName, deployment); err != nil { + if apierrors.IsNotFound(err) { + deployment = createApiDeployment(cluster) + } + if err := controllerutil.SetControllerReference(cluster, deployment, r.Scheme); err != nil { + return true, err + } + if err := r.Client.Create(ctx, deployment); err == nil { + return false, nil + } else { + return true, err + } + } else { + err := r.updateApiDeployment(ctx, deployment, cluster) + if err != nil { + return false, err + } + } + + return true, nil +} + +//only notice the property of replicas and image and version +func (r *DSApiReconciler) updateApiDeployment(ctx context.Context, deployment *v1.Deployment, cluster *dsv1alpha1.DSApi) error { + deployment.Spec.Replicas = int32Ptr(int32(cluster.Spec.Replicas)) + deployment.Spec.Template.Spec.Containers[0].Image = ImageName(cluster.Spec.Repository, cluster.Spec.Version) + if err := r.Client.Update(ctx, deployment); err != nil { + return err + } + return nil +}
