This is an automated email from the ASF dual-hosted git repository.

kaihsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 20b76e6  SUBMARINE-900. Create a submarine mlflow for single user
20b76e6 is described below

commit 20b76e65177a86a6fd1a911a4f0f5f768f24b03e
Author: Kenchu123 <[email protected]>
AuthorDate: Mon Jul 5 19:57:43 2021 +0800

    SUBMARINE-900. Create a submarine mlflow for single user
    
    ### What is this PR for?
    
    Create  a submarine mlflow by operator for single user.
    
    ### What type of PR is it?
    
    [Feature]
    
    ### Todos
    
    ### What is the Jira issue?
    
    https://issues.apache.org/jira/browse/SUBMARINE-900
    
    ### How should this be tested?
    
    ### Screenshots (if appropriate)
    
    
https://user-images.githubusercontent.com/17617373/124424951-72767500-dd9a-11eb-8e4c-db21ea8d56c6.mov
    
    ### Questions:
    * Do the license files need updating? No
    * Are there breaking changes for older versions? No
    * Does this need new documentation? No
    
    Author: Kenchu123 <[email protected]>
    
    Signed-off-by: Kai-Hsun Chen <[email protected]>
    
    Closes #645 from Kenchu123/SUBMARINE-900 and squashes the following commits:
    
    77ddabf0 [Kenchu123] SUBMARINE-900. rerun gofmt on mlflow
    203b3908 [Kenchu123] SUBMARINE-900. change tab indent to spaces in 
controller and fix the format difference
    4d1e391c [Kenchu123] SUBMARINE-900. Add type to mlflow service
    b1136cd1 [Kenchu123] SUBMARINE-900. Create mlflow on applying a submarine
---
 submarine-cloud-v2/go.mod                          |   4 +-
 submarine-cloud-v2/pkg/controller/controller.go    |  10 +
 .../pkg/controller/submarine_mlflow.go             | 342 +++++++++++++++++++++
 3 files changed, 354 insertions(+), 2 deletions(-)

diff --git a/submarine-cloud-v2/go.mod b/submarine-cloud-v2/go.mod
index 96634f0..330a66f 100644
--- a/submarine-cloud-v2/go.mod
+++ b/submarine-cloud-v2/go.mod
@@ -3,10 +3,10 @@ module github.com/apache/submarine/submarine-cloud-v2
 go 1.16
 
 require (
-       github.com/fatih/color v1.7.0
+       github.com/fatih/color v1.7.0 // indirect
        github.com/gofrs/flock v0.8.0
        github.com/pkg/errors v0.9.1
-       github.com/stretchr/testify v1.7.0
+       github.com/stretchr/testify v1.7.0 // indirect
        github.com/traefik/traefik/v2 v2.4.8
        golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
        gopkg.in/yaml.v2 v2.4.0
diff --git a/submarine-cloud-v2/pkg/controller/controller.go 
b/submarine-cloud-v2/pkg/controller/controller.go
index 8d46bef..bc9eaab 100644
--- a/submarine-cloud-v2/pkg/controller/controller.go
+++ b/submarine-cloud-v2/pkg/controller/controller.go
@@ -66,6 +66,7 @@ const (
        serverName                  = "submarine-server"
        databaseName                = "submarine-database"
        tensorboardName             = "submarine-tensorboard"
+       mlflowName                  = "submarine-mlflow"
        ingressName                 = serverName + "-ingress"
        databasePvNamePrefix        = databaseName + "-pv"
        databasePvcName             = databaseName + "-pvc"
@@ -73,6 +74,10 @@ const (
        tensorboardPvcName          = tensorboardName + "-pvc"
        tensorboardServiceName      = tensorboardName + "-service"
        tensorboardIngressRouteName = tensorboardName + "-ingressroute"
+       mlflowPvNamePrefix          = mlflowName + "-pv"
+       mlflowPvcName               = mlflowName + "-pvc"
+       mlflowServiceName           = mlflowName + "-service"
+       mlflowIngressRouteName      = mlflowName + "-ingressroute"
 )
 
 // PersistentVolumes are not namespaced resources, so we add the namespace as a
@@ -480,6 +485,11 @@ func (c *Controller) syncHandler(workqueueItem 
WorkQueueItem) error {
                        return err
                }
 
+               err = c.createSubmarineMlflow(submarine)
+               if err != nil {
+                       return err
+               }
+
                err = c.updateSubmarineStatus(submarine, serverDeployment, 
databaseDeployment)
                if err != nil {
                        return err
diff --git a/submarine-cloud-v2/pkg/controller/submarine_mlflow.go 
b/submarine-cloud-v2/pkg/controller/submarine_mlflow.go
new file mode 100644
index 0000000..3f431e9
--- /dev/null
+++ b/submarine-cloud-v2/pkg/controller/submarine_mlflow.go
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package controller
+
+import (
+       "context"
+       "fmt"
+
+       v1alpha1 
"github.com/apache/submarine/submarine-cloud-v2/pkg/apis/submarine/v1alpha1"
+       traefikv1alpha1 
"github.com/traefik/traefik/v2/pkg/provider/kubernetes/crd/traefik/v1alpha1"
+       appsv1 "k8s.io/api/apps/v1"
+       corev1 "k8s.io/api/core/v1"
+       "k8s.io/apimachinery/pkg/api/errors"
+       "k8s.io/apimachinery/pkg/api/resource"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/util/intstr"
+       "k8s.io/klog/v2"
+)
+
+func newSubmarineMlflowPersistentVolume(submarine *v1alpha1.Submarine) 
*corev1.PersistentVolume {
+       var persistentVolumeSource corev1.PersistentVolumeSource
+       switch submarine.Spec.Storage.StorageType {
+       case "nfs":
+               persistentVolumeSource = corev1.PersistentVolumeSource{
+                       NFS: &corev1.NFSVolumeSource{
+                               Server: submarine.Spec.Storage.NfsIP,
+                               Path:   submarine.Spec.Storage.NfsPath,
+                       },
+               }
+       case "host":
+               hostPathType := corev1.HostPathDirectoryOrCreate
+               persistentVolumeSource = corev1.PersistentVolumeSource{
+                       HostPath: &corev1.HostPathVolumeSource{
+                               Path: submarine.Spec.Storage.HostPath,
+                               Type: &hostPathType,
+                       },
+               }
+       }
+       return &corev1.PersistentVolume{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name: pvName(mlflowPvNamePrefix, submarine.Namespace),
+                       OwnerReferences: []metav1.OwnerReference{
+                               *metav1.NewControllerRef(submarine, 
v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
+                       },
+               },
+               Spec: corev1.PersistentVolumeSpec{
+                       AccessModes: []corev1.PersistentVolumeAccessMode{
+                               corev1.ReadWriteMany,
+                       },
+                       Capacity: corev1.ResourceList{
+                               corev1.ResourceStorage: 
resource.MustParse(submarine.Spec.Mlflow.StorageSize),
+                       },
+                       PersistentVolumeSource: persistentVolumeSource,
+               },
+       }
+}
+
+func newSubmarineMlflowPersistentVolumeClaim(submarine *v1alpha1.Submarine) 
*corev1.PersistentVolumeClaim {
+       storageClassName := ""
+       return &corev1.PersistentVolumeClaim{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name: mlflowPvcName,
+                       OwnerReferences: []metav1.OwnerReference{
+                               *metav1.NewControllerRef(submarine, 
v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
+                       },
+               },
+               Spec: corev1.PersistentVolumeClaimSpec{
+                       AccessModes: []corev1.PersistentVolumeAccessMode{
+                               corev1.ReadWriteMany,
+                       },
+                       Resources: corev1.ResourceRequirements{
+                               Requests: corev1.ResourceList{
+                                       corev1.ResourceStorage: 
resource.MustParse(submarine.Spec.Mlflow.StorageSize),
+                               },
+                       },
+                       VolumeName:       pvName(mlflowPvNamePrefix, 
submarine.Namespace),
+                       StorageClassName: &storageClassName,
+               },
+       }
+}
+
+func newSubmarineMlflowDeployment(submarine *v1alpha1.Submarine) 
*appsv1.Deployment {
+       return &appsv1.Deployment{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name: mlflowName,
+                       OwnerReferences: []metav1.OwnerReference{
+                               *metav1.NewControllerRef(submarine, 
v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
+                       },
+               },
+               Spec: appsv1.DeploymentSpec{
+                       Selector: &metav1.LabelSelector{
+                               MatchLabels: map[string]string{
+                                       "app": mlflowName + "-pod",
+                               },
+                       },
+                       Template: corev1.PodTemplateSpec{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Labels: map[string]string{
+                                               "app": mlflowName + "-pod",
+                                       },
+                               },
+                               Spec: corev1.PodSpec{
+                                       Containers: []corev1.Container{
+                                               {
+                                                       Name:            
mlflowName + "-container",
+                                                       Image:           
"apache/submarine:mlflow-0.6.0-SNAPSHOT",
+                                                       ImagePullPolicy: 
"IfNotPresent",
+                                                       Ports: 
[]corev1.ContainerPort{
+                                                               {
+                                                                       
ContainerPort: 5000,
+                                                               },
+                                                       },
+                                                       VolumeMounts: 
[]corev1.VolumeMount{
+                                                               {
+                                                                       
MountPath: "/logs",
+                                                                       Name:   
   "volume",
+                                                                       
SubPath:   mlflowName,
+                                                               },
+                                                       },
+                                                       ReadinessProbe: 
&corev1.Probe{
+                                                               Handler: 
corev1.Handler{
+                                                                       
TCPSocket: &corev1.TCPSocketAction{
+                                                                               
Port: intstr.FromInt(5000),
+                                                                       },
+                                                               },
+                                                               
InitialDelaySeconds: 60,
+                                                               PeriodSeconds:  
     10,
+                                                       },
+                                               },
+                                       },
+                                       Volumes: []corev1.Volume{
+                                               {
+                                                       Name: "volume",
+                                                       VolumeSource: 
corev1.VolumeSource{
+                                                               
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
+                                                                       
ClaimName: mlflowPvcName,
+                                                               },
+                                                       },
+                                               },
+                                       },
+                               },
+                       },
+               },
+       }
+}
+
+func newSubmarineMlflowService(submarine *v1alpha1.Submarine) *corev1.Service {
+       return &corev1.Service{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name: mlflowServiceName,
+                       OwnerReferences: []metav1.OwnerReference{
+                               *metav1.NewControllerRef(submarine, 
v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
+                       },
+               },
+               Spec: corev1.ServiceSpec{
+                       Type: corev1.ServiceTypeClusterIP,
+                       Selector: map[string]string{
+                               "app": mlflowName + "-pod",
+                       },
+                       Ports: []corev1.ServicePort{
+                               {
+                                       Protocol:   "TCP",
+                                       Port:       5000,
+                                       TargetPort: intstr.FromInt(5000),
+                               },
+                       },
+               },
+       }
+}
+
+func newSubmarineMlflowIngressRoute(submarine *v1alpha1.Submarine) 
*traefikv1alpha1.IngressRoute {
+       return &traefikv1alpha1.IngressRoute{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name: mlflowName + "-ingressroute",
+                       OwnerReferences: []metav1.OwnerReference{
+                               *metav1.NewControllerRef(submarine, 
v1alpha1.SchemeGroupVersion.WithKind("Submarine")),
+                       },
+               },
+               Spec: traefikv1alpha1.IngressRouteSpec{
+                       EntryPoints: []string{
+                               "web",
+                       },
+                       Routes: []traefikv1alpha1.Route{
+                               {
+                                       Kind:  "Rule",
+                                       Match: "PathPrefix(`/mlflow`)",
+                                       Services: []traefikv1alpha1.Service{
+                                               {
+                                                       LoadBalancerSpec: 
traefikv1alpha1.LoadBalancerSpec{
+                                                               Kind: "Service",
+                                                               Name: 
mlflowServiceName,
+                                                               Port: 5000,
+                                                       },
+                                               },
+                                       },
+                               },
+                       },
+               },
+       }
+}
+
+// createSubmarineMlflow is a function to create submarine-mlflow.
+// Reference: 
https://github.com/apache/submarine/blob/master/helm-charts/submarine/templates/submarine-mlflow.yaml
+func (c *Controller) createSubmarineMlflow(submarine *v1alpha1.Submarine) 
error {
+       klog.Info("[createSubmarineMlflow]")
+
+       // Step 1: Create PersistentVolume
+       // PersistentVolumes are not namespaced resources, so we add the 
namespace
+       // as a suffix to distinguish them
+       pv, err := c.persistentvolumeLister.Get(pvName(mlflowPvNamePrefix, 
submarine.Namespace))
+
+       // If the resource doesn't exist, we'll create it
+       if errors.IsNotFound(err) {
+               pv, err = 
c.kubeclientset.CoreV1().PersistentVolumes().Create(context.TODO(), 
newSubmarineMlflowPersistentVolume(submarine), metav1.CreateOptions{})
+               if err != nil {
+                       klog.Info(err)
+               }
+               klog.Info(" Create PersistentVolume: ", pv.Name)
+       }
+       // If an error occurs during Get/Create, we'll requeue the item so we 
can
+       // attempt processing again later. This could have been caused by a
+       // temporary network failure, or any other transient reason.
+       if err != nil {
+               return err
+       }
+
+       if !metav1.IsControlledBy(pv, submarine) {
+               msg := fmt.Sprintf(MessageResourceExists, pv.Name)
+               c.recorder.Event(submarine, corev1.EventTypeWarning, 
ErrResourceExists, msg)
+               return fmt.Errorf(msg)
+       }
+
+       // Step 2: Create PersistentVolumeClaim
+       pvc, err := 
c.persistentvolumeclaimLister.PersistentVolumeClaims(submarine.Namespace).Get(mlflowPvcName)
+       // If the resource doesn't exist, we'll create it
+       if errors.IsNotFound(err) {
+               pvc, err = 
c.kubeclientset.CoreV1().PersistentVolumeClaims(submarine.Namespace).Create(context.TODO(),
+                       newSubmarineMlflowPersistentVolumeClaim(submarine),
+                       metav1.CreateOptions{})
+               if err != nil {
+                       klog.Info(err)
+               }
+               klog.Info("     Create PersistentVolumeClaim: ", pvc.Name)
+       }
+       // If an error occurs during Get/Create, we'll requeue the item so we 
can
+       // attempt processing again later. This could have been caused by a
+       // temporary network failure, or any other transient reason.
+       if err != nil {
+               return err
+       }
+
+       if !metav1.IsControlledBy(pvc, submarine) {
+               msg := fmt.Sprintf(MessageResourceExists, pvc.Name)
+               c.recorder.Event(submarine, corev1.EventTypeWarning, 
ErrResourceExists, msg)
+               return fmt.Errorf(msg)
+       }
+
+       // Step 3: Create Deployment
+       deployment, err := 
c.deploymentLister.Deployments(submarine.Namespace).Get(mlflowName)
+       if errors.IsNotFound(err) {
+               deployment, err = 
c.kubeclientset.AppsV1().Deployments(submarine.Namespace).Create(context.TODO(),
 newSubmarineMlflowDeployment(submarine), metav1.CreateOptions{})
+               if err != nil {
+                       klog.Info(err)
+               }
+               klog.Info("     Create Deployment: ", deployment.Name)
+       }
+       // If an error occurs during Get/Create, we'll requeue the item so we 
can
+       // attempt processing again later. This could have been caused by a
+       // temporary network failure, or any other transient reason.
+       if err != nil {
+               return err
+       }
+
+       if !metav1.IsControlledBy(deployment, submarine) {
+               msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
+               c.recorder.Event(submarine, corev1.EventTypeWarning, 
ErrResourceExists, msg)
+               return fmt.Errorf(msg)
+       }
+
+       // Step 4: Create Service
+       service, err := 
c.serviceLister.Services(submarine.Namespace).Get(mlflowServiceName)
+       // If the resource doesn't exist, we'll create it
+       if errors.IsNotFound(err) {
+               service, err = 
c.kubeclientset.CoreV1().Services(submarine.Namespace).Create(context.TODO(), 
newSubmarineMlflowService(submarine), metav1.CreateOptions{})
+               if err != nil {
+                       klog.Info(err)
+               }
+               klog.Info(" Create Service: ", service.Name)
+       }
+       // If an error occurs during Get/Create, we'll requeue the item so we 
can
+       // attempt processing again later. This could have been caused by a
+       // temporary network failure, or any other transient reason.
+       if err != nil {
+               return err
+       }
+
+       if !metav1.IsControlledBy(service, submarine) {
+               msg := fmt.Sprintf(MessageResourceExists, service.Name)
+               c.recorder.Event(submarine, corev1.EventTypeWarning, 
ErrResourceExists, msg)
+               return fmt.Errorf(msg)
+       }
+
+       // Step 5: Create IngressRoute
+       ingressroute, err := 
c.ingressrouteLister.IngressRoutes(submarine.Namespace).Get(mlflowIngressRouteName)
+       // If the resource doesn't exist, we'll create it
+       if errors.IsNotFound(err) {
+               ingressroute, err = 
c.traefikclientset.TraefikV1alpha1().IngressRoutes(submarine.Namespace).Create(context.TODO(),
 newSubmarineMlflowIngressRoute(submarine), metav1.CreateOptions{})
+               if err != nil {
+                       klog.Info(err)
+               }
+               klog.Info(" Create IngressRoute: ", ingressroute.Name)
+       }
+       // If an error occurs during Get/Create, we'll requeue the item so we 
can
+       // attempt processing again later. This could have been caused by a
+       // temporary network failure, or any other transient reason.
+       if err != nil {
+               return err
+       }
+
+       if !metav1.IsControlledBy(ingressroute, submarine) {
+               msg := fmt.Sprintf(MessageResourceExists, ingressroute.Name)
+               c.recorder.Event(submarine, corev1.EventTypeWarning, 
ErrResourceExists, msg)
+               return fmt.Errorf(msg)
+       }
+
+       return nil
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to