This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-operator.git

commit c87b0c0a0e64815fa4706845d7db0091733b816e
Author: nobolity <[email protected]>
AuthorDate: Sun May 29 21:52:19 2022 +0800

    feat(operator): add api
---
 api/v1alpha1/dsapi_types.go                        | 120 +++++++++++
 .../ds.apache.dolphinscheduler.dev_dsapis.yaml     |  56 ++++++
 config/crd/patches/cainjection_in_dsapis.yaml      |   7 +
 config/crd/patches/webhook_in_dsapis.yaml          |  16 ++
 config/rbac/dsapi_editor_role.yaml                 |  24 +++
 config/rbac/dsapi_viewer_role.yaml                 |  20 ++
 config/samples/ds_v1alpha1_dsapi.yaml              |  17 ++
 controllers/dsapi_controller.go                    | 221 +++++++++++++++++++++
 8 files changed, 481 insertions(+)

diff --git a/api/v1alpha1/dsapi_types.go b/api/v1alpha1/dsapi_types.go
new file mode 100644
index 0000000..01f11f9
--- /dev/null
+++ b/api/v1alpha1/dsapi_types.go
@@ -0,0 +1,120 @@
+/*
+Copyright 2022.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package v1alpha1
+
+import (
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
+// NOTE: json tags are required.  Any new fields you add must have json tags 
for the fields to be serialized.
+
+// DSApiSpec defines the desired state of DSApi
+type DSApiSpec struct {
+       Datasource *DateSourceTemplate `json:"datasource"`
+       // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
+       // Important: Run "make" to regenerate code after modifying this file
+
+       // Version is the expected version of the ds cluster.
+       // The ds-operator will eventually make the ds cluster version
+       // equal to the expected version.
+       // If version is not set, default is "3.0.0-alpha".
+       // +kubebuilder:default="3.0.0-alpha"
+       Version string `json:"version,omitempty"`
+
+       // Repository is the name of the repository that hosts
+       // ds container images. It should be direct clone of the repository in 
official
+       // By default, it is `apache/dolphinscheduler-master`.
+       // +kubebuilder:default=apache/dolphinscheduler-master
+       Repository string `json:"repository,omitempty"`
+
+       // Replicas is the expected size of the ms-master.
+       // The ds-master-operator will eventually make the size of the running
+       //  equal to the expected size.
+       // The vaild range of the size is from 1 to 7.
+       // +kubebuilder:default=3
+       // +kubebuilder:validation:Minimum=1
+       // +kubebuilder:validation:Maximum=7
+       Replicas int `json:"replicas"`
+
+       // Pod defines the policy to create pod for the dm-master pod.
+       // Updating Pod does not take effect on any existing dm-master pods.
+       Pod *PodPolicy `json:"pod,omitempty"`
+
+       // Paused is to pause the control of the operator for the ds-master .
+       // +kubebuilder:default=false
+       Paused bool `json:"paused,omitempty"`
+
+       //LogPvcName defines the  log capacity of application ,the position is 
/opt/dolphinscheduler/logs eg 20Gi
+       LogPvcName string `json:"log_pvc_name,omitempty"`
+
+       //ReGenerate defines if delete the old_deployment and create a new 
deployment
+       // +kubebuilder:default=false
+       ReGenerate bool `json:"re_generate,omitempty"`
+
+       //NodePort is the port node exposed
+       NodePort int32 `json:"node_port"`
+}
+
+// DSApiStatus defines the observed state of DSApi
+type DSApiStatus struct {
+       // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
+       // Important: Run "make" to regenerate code after modifying this file
+       // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
+       // Important: Run "make" to regenerate code after modifying this file
+       // Phase is the cluster running phase
+       // +kubebuilder:validation:Enum="";Creating;Running;Failed;Finished
+       Phase DsPhase `json:"phase,omitempty"`
+       // ControlPaused indicates the operator pauses the control of the 
cluster.
+       // +kubebuilder:default=false
+       ControlPaused bool `json:"controlPaused,omitempty"`
+
+       // Condition keeps track of all cluster conditions, if they exist.
+       Conditions []DsCondition `json:"conditions,omitempty"`
+
+       // Replicas is the current size of the cluster
+       // +kubebuilder:default=0
+       Replicas int `json:"replicas,omitempty"`
+
+       // Members are the dsMaster members in the cluster
+       Members MembersStatus `json:"members,omitempty"`
+}
+
+//+kubebuilder:object:root=true
+//+kubebuilder:subresource:status
+
+// DSApi is the Schema for the dsapis API
+type DSApi struct {
+       metav1.TypeMeta   `json:",inline"`
+       metav1.ObjectMeta `json:"metadata,omitempty"`
+
+       Spec   DSApiSpec   `json:"spec,omitempty"`
+       Status DSApiStatus `json:"status,omitempty"`
+}
+
+//+kubebuilder:object:root=true
+
+// DSApiList contains a list of DSApi
+type DSApiList struct {
+       metav1.TypeMeta `json:",inline"`
+       metav1.ListMeta `json:"metadata,omitempty"`
+       Items           []DSApi `json:"items"`
+}
+
+func init() {
+       SchemeBuilder.Register(&DSApi{}, &DSApiList{})
+}
diff --git a/config/crd/bases/ds.apache.dolphinscheduler.dev_dsapis.yaml 
b/config/crd/bases/ds.apache.dolphinscheduler.dev_dsapis.yaml
new file mode 100644
index 0000000..5f98244
--- /dev/null
+++ b/config/crd/bases/ds.apache.dolphinscheduler.dev_dsapis.yaml
@@ -0,0 +1,56 @@
+---
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+  annotations:
+    controller-gen.kubebuilder.io/version: v0.8.0
+  creationTimestamp: null
+  name: dsapis.ds.apache.dolphinscheduler.dev
+spec:
+  group: ds.apache.dolphinscheduler.dev
+  names:
+    kind: DSApi
+    listKind: DSApiList
+    plural: dsapis
+    singular: dsapi
+  scope: Namespaced
+  versions:
+  - name: v1alpha1
+    schema:
+      openAPIV3Schema:
+        description: DSApi is the Schema for the dsapis API
+        properties:
+          apiVersion:
+            description: 'APIVersion defines the versioned schema of this 
representation
+              of an object. Servers should convert recognized schemas to the 
latest
+              internal value, and may reject unrecognized values. More info: 
https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
+            type: string
+          kind:
+            description: 'Kind is a string value representing the REST 
resource this
+              object represents. Servers may infer this from the endpoint the 
client
+              submits requests to. Cannot be updated. In CamelCase. More info: 
https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
+            type: string
+          metadata:
+            type: object
+          spec:
+            description: DSApiSpec defines the desired state of DSApi
+            properties:
+              foo:
+                description: Foo is an example field of DSApi. Edit 
dsapi_types.go
+                  to remove/update
+                type: string
+            type: object
+          status:
+            description: DSApiStatus defines the observed state of DSApi
+            type: object
+        type: object
+    served: true
+    storage: true
+    subresources:
+      status: {}
+status:
+  acceptedNames:
+    kind: ""
+    plural: ""
+  conditions: []
+  storedVersions: []
diff --git a/config/crd/patches/cainjection_in_dsapis.yaml 
b/config/crd/patches/cainjection_in_dsapis.yaml
new file mode 100644
index 0000000..c11123f
--- /dev/null
+++ b/config/crd/patches/cainjection_in_dsapis.yaml
@@ -0,0 +1,7 @@
+# The following patch adds a directive for certmanager to inject CA into the 
CRD
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+  annotations:
+    cert-manager.io/inject-ca-from: 
$(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME)
+  name: dsapis.ds.apache.dolphinscheduler.dev
diff --git a/config/crd/patches/webhook_in_dsapis.yaml 
b/config/crd/patches/webhook_in_dsapis.yaml
new file mode 100644
index 0000000..937789c
--- /dev/null
+++ b/config/crd/patches/webhook_in_dsapis.yaml
@@ -0,0 +1,16 @@
+# The following patch enables a conversion webhook for the CRD
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+  name: dsapis.ds.apache.dolphinscheduler.dev
+spec:
+  conversion:
+    strategy: Webhook
+    webhook:
+      clientConfig:
+        service:
+          namespace: system
+          name: webhook-service
+          path: /convert
+      conversionReviewVersions:
+      - v1
diff --git a/config/rbac/dsapi_editor_role.yaml 
b/config/rbac/dsapi_editor_role.yaml
new file mode 100644
index 0000000..c7c0e68
--- /dev/null
+++ b/config/rbac/dsapi_editor_role.yaml
@@ -0,0 +1,24 @@
+# permissions for end users to edit dsapis.
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+  name: dsapi-editor-role
+rules:
+- apiGroups:
+  - ds.apache.dolphinscheduler.dev
+  resources:
+  - dsapis
+  verbs:
+  - create
+  - delete
+  - get
+  - list
+  - patch
+  - update
+  - watch
+- apiGroups:
+  - ds.apache.dolphinscheduler.dev
+  resources:
+  - dsapis/status
+  verbs:
+  - get
diff --git a/config/rbac/dsapi_viewer_role.yaml 
b/config/rbac/dsapi_viewer_role.yaml
new file mode 100644
index 0000000..acf5226
--- /dev/null
+++ b/config/rbac/dsapi_viewer_role.yaml
@@ -0,0 +1,20 @@
+# permissions for end users to view dsapis.
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+  name: dsapi-viewer-role
+rules:
+- apiGroups:
+  - ds.apache.dolphinscheduler.dev
+  resources:
+  - dsapis
+  verbs:
+  - get
+  - list
+  - watch
+- apiGroups:
+  - ds.apache.dolphinscheduler.dev
+  resources:
+  - dsapis/status
+  verbs:
+  - get
diff --git a/config/samples/ds_v1alpha1_dsapi.yaml 
b/config/samples/ds_v1alpha1_dsapi.yaml
new file mode 100644
index 0000000..e3264d7
--- /dev/null
+++ b/config/samples/ds_v1alpha1_dsapi.yaml
@@ -0,0 +1,17 @@
+apiVersion: ds.apache.dolphinscheduler.dev/v1alpha1
+kind: DSApi
+metadata:
+  name: ds-api
+  namespace: ds
+  labels:
+    app: ds-api
+spec:
+  replicas: 1
+  version: 3.0.0-alpha
+  repository: apache/dolphinscheduler-api
+  node_port:30002
+  datasource:
+    drive_name: "org.postgresql.Driver"
+    url: "jdbc:postgresql://172.17.0.4:5432/dolphinscheduler"
+    username: "postgresadmin"
+    password: "admin12345"
\ No newline at end of file
diff --git a/controllers/dsapi_controller.go b/controllers/dsapi_controller.go
new file mode 100644
index 0000000..8d74913
--- /dev/null
+++ b/controllers/dsapi_controller.go
@@ -0,0 +1,221 @@
+/*
+Copyright 2022.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package controllers
+
+import (
+       "context"
+       dsv1alpha1 "dolphinscheduler-operator/api/v1alpha1"
+       v1 "k8s.io/api/apps/v1"
+       corev1 "k8s.io/api/core/v1"
+       "k8s.io/apimachinery/pkg/api/errors"
+       apierrors "k8s.io/apimachinery/pkg/api/errors"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/types"
+       "k8s.io/client-go/tools/record"
+       ctrl "sigs.k8s.io/controller-runtime"
+       "sigs.k8s.io/controller-runtime/pkg/client"
+       "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+       "time"
+)
+
+var (
+       apiLogger = ctrl.Log.WithName("DSApi-controller")
+)
+
+// DSApiReconciler reconciles a DSApi object
+type DSApiReconciler struct {
+       client.Client
+       Scheme   *runtime.Scheme
+       Recorder record.EventRecorder
+}
+
+//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsapis,verbs=get;list;watch;create;update;patch;delete
+//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsapis/status,verbs=get;update;patch
+//+kubebuilder:rbac:groups=ds.apache.dolphinscheduler.dev,resources=dsapis/finalizers,verbs=update
+
+// Reconcile is part of the main kubernetes reconciliation loop which aims to
+// move the current state of the cluster closer to the desired state.
+// TODO(user): Modify the Reconcile function to compare the state specified by
+// the DSApi object against the actual cluster state, and then
+// perform operations to make the cluster state reflect the state specified by
+// the user.
+//
+// For more details, check Reconcile and its Result here:
+// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
+func (r *DSApiReconciler) Reconcile(ctx context.Context, req ctrl.Request) 
(ctrl.Result, error) {
+       apiLogger.Info("dmApi start reconcile logic")
+       defer apiLogger.Info("dmApi Reconcile end 
---------------------------------------------")
+
+       cluster := &dsv1alpha1.DSApi{}
+
+       if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
+               if errors.IsNotFound(err) {
+                       r.Recorder.Event(cluster, corev1.EventTypeWarning, 
"dsApi is not Found", "dsApi is not Found")
+                       return ctrl.Result{}, nil
+               }
+               return ctrl.Result{}, err
+       }
+       desired := cluster.DeepCopy()
+
+       // Handler finalizer
+       // examine DeletionTimestamp to determine if object is under deletion
+       if cluster.ObjectMeta.DeletionTimestamp.IsZero() {
+               // The object is not being deleted, so if it does not have our 
finalizer,
+               // then lets add the finalizer and update the object. This is 
equivalent
+               // registering our finalizer.
+               if !controllerutil.ContainsFinalizer(desired, 
dsv1alpha1.FinalizerName) {
+                       controllerutil.AddFinalizer(desired, 
dsv1alpha1.FinalizerName)
+                       if err := r.Update(ctx, desired); err != nil {
+                               return ctrl.Result{}, err
+                       }
+               }
+       } else {
+               // The object is being deleted
+
+               if controllerutil.ContainsFinalizer(desired, 
dsv1alpha1.FinalizerName) {
+                       // our finalizer is present, so lets handle any 
external dependency
+                       if err := r.ensureDSApiDeleted(ctx, cluster); err != 
nil {
+                               return ctrl.Result{}, err
+                       }
+
+                       // remove our finalizer from the list and update it.
+                       controllerutil.RemoveFinalizer(desired, 
dsv1alpha1.FinalizerName)
+                       if err := r.Update(ctx, desired); err != nil {
+                               return ctrl.Result{}, err
+                       }
+               }
+               // Stop reconciliation as the item is being deleted
+               return ctrl.Result{}, nil
+       }
+
+       if cluster.Spec.Paused {
+               apiLogger.Info("ds-Api control has been paused: ", 
"ds-Api-name", cluster.Name)
+               desired.Status.ControlPaused = true
+               if err := r.Status().Patch(ctx, desired, 
client.MergeFrom(cluster)); err != nil {
+                       return ctrl.Result{}, err
+               }
+               r.Recorder.Event(cluster, corev1.EventTypeNormal, "the spec 
status is paused", "do nothing")
+               return ctrl.Result{}, nil
+       }
+
+       // 1. First time we see the ds-master-cluster, initialize it
+       if cluster.Status.Phase == dsv1alpha1.DsPhaseNone {
+               desired.Status.Phase = dsv1alpha1.DsPhaseCreating
+               apiLogger.Info("phase had been changed from  none ---> 
creating")
+               err := r.Client.Status().Patch(ctx, desired, 
client.MergeFrom(cluster))
+               return ctrl.Result{RequeueAfter: 100 * time.Millisecond}, err
+       }
+
+       //2 ensure the Api service
+       apiLogger.Info("Ensuring Api service")
+
+       if err := r.ensureApiService(ctx, cluster); err != nil {
+               return ctrl.Result{Requeue: true}, nil
+       }
+
+       if requeue, err := r.ensureApiDeployment(ctx, cluster); err != nil {
+               return ctrl.Result{Requeue: false}, err
+       } else {
+               if !requeue {
+                       return ctrl.Result{Requeue: false}, nil
+               }
+       }
+
+       apiLogger.Info("******************************************************")
+       desired.Status.Phase = dsv1alpha1.DsPhaseNone
+       if err := r.Update(ctx, desired); err != nil {
+               return ctrl.Result{}, err
+       }
+       return ctrl.Result{Requeue: false}, nil
+}
+
+// SetupWithManager sets up the controller with the Manager.
+func (r *DSApiReconciler) SetupWithManager(mgr ctrl.Manager) error {
+       return ctrl.NewControllerManagedBy(mgr).
+               For(&dsv1alpha1.DSApi{}).
+               Owns(&v1.Deployment{}).
+               Owns(&corev1.Service{}).
+               Owns(&corev1.Pod{}).
+               Complete(r)
+}
+
+func (r *DSApiReconciler) ensureDSApiDeleted(ctx context.Context, DSApi 
*dsv1alpha1.DSApi) error {
+       if err := r.Client.Delete(ctx, DSApi, 
client.PropagationPolicy(metav1.DeletePropagationOrphan)); err != nil {
+               return err
+       }
+       return nil
+}
+
+func (r *DSApiReconciler) ensureApiService(ctx context.Context, cluster 
*dsv1alpha1.DSApi) error {
+       // 1. Client service
+       service := &corev1.Service{}
+       namespacedName := types.NamespacedName{Namespace: cluster.Namespace, 
Name: dsv1alpha1.DsApiServiceValue}
+       if err := r.Client.Get(ctx, namespacedName, service); err != nil {
+               // Local cache not found
+               logger.Info("get service error")
+               if apierrors.IsNotFound(err) {
+                       service = createApiService(cluster)
+                       if err := 
controllerutil.SetControllerReference(cluster, service, r.Scheme); err != nil {
+                               logger.Info("create Api service error")
+                               return err
+                       }
+                       // Remote may already exist, so we will return err, for 
the next time, this code will not execute
+                       if err := r.Client.Create(ctx, service); err != nil {
+                               logger.Info("create Api service error1")
+                               return err
+                       }
+                       logger.Info("the Api service had been created")
+               }
+       }
+       return nil
+}
+
+func (r *DSApiReconciler) ensureApiDeployment(ctx context.Context, cluster 
*dsv1alpha1.DSApi) (bool, error) {
+       deployment := &v1.Deployment{}
+       deploymentNamespaceName := types.NamespacedName{Namespace: 
cluster.Namespace, Name: dsv1alpha1.DsApiDeploymentValue}
+       if err := r.Client.Get(ctx, deploymentNamespaceName, deployment); err 
!= nil {
+               if apierrors.IsNotFound(err) {
+                       deployment = createApiDeployment(cluster)
+               }
+               if err := controllerutil.SetControllerReference(cluster, 
deployment, r.Scheme); err != nil {
+                       return true, err
+               }
+               if err := r.Client.Create(ctx, deployment); err == nil {
+                       return false, nil
+               } else {
+                       return true, err
+               }
+       } else {
+               err := r.updateApiDeployment(ctx, deployment, cluster)
+               if err != nil {
+                       return false, err
+               }
+       }
+
+       return true, nil
+}
+
+//only notice the property of replicas  and image and version
+func (r *DSApiReconciler) updateApiDeployment(ctx context.Context, deployment 
*v1.Deployment, cluster *dsv1alpha1.DSApi) error {
+       deployment.Spec.Replicas = int32Ptr(int32(cluster.Spec.Replicas))
+       deployment.Spec.Template.Spec.Containers[0].Image = 
ImageName(cluster.Spec.Repository, cluster.Spec.Version)
+       if err := r.Client.Update(ctx, deployment); err != nil {
+               return err
+       }
+       return nil
+}

Reply via email to