This is an automated email from the ASF dual-hosted git repository.

kaihsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c145f5  SUBMARINE-771. Implement Controller Pattern
8c145f5 is described below

commit 8c145f50c55742ba4a9456ee439abd279f4e1d23
Author: kevin85421 <[email protected]>
AuthorDate: Wed Mar 31 15:31:26 2021 +0800

    SUBMARINE-771. Implement Controller Pattern
---
 .gitignore                                         |   1 +
 submarine-cloud-v2/README.md                       |   6 +
 submarine-cloud-v2/controller.go                   | 185 +++++++++++++++++++++
 submarine-cloud-v2/go.sum                          |   1 +
 submarine-cloud-v2/main.go                         |  46 ++++-
 .../{main.go => pkg/signals/signal.go}             |  35 ++--
 .../{main.go => pkg/signals/signal_posix.go}       |  22 +--
 .../{main.go => pkg/signals/signal_windows.go}     |  21 +--
 8 files changed, 262 insertions(+), 55 deletions(-)

diff --git a/.gitignore b/.gitignore
index 633e62a..25e6577 100644
--- a/.gitignore
+++ b/.gitignore
@@ -87,6 +87,7 @@ submarine-security/spark-security/derby.log
 
 # submarine-cloud-v2
 submarine-cloud-v2/vendor/*
+submarine-cloud-v2/submarine-operator
 
 # vscode file
 .project
diff --git a/submarine-cloud-v2/README.md b/submarine-cloud-v2/README.md
index 070d71e..7490e4a 100644
--- a/submarine-cloud-v2/README.md
+++ b/submarine-cloud-v2/README.md
@@ -38,4 +38,10 @@ kubectl apply -f artifacts/examples/example-submarine.yaml
 
 # Step3: Run unit test
 go test
+```
+
+# Build Project
+```bash
+go build -o submarine-operator
+./submarine-operator
 ```
\ No newline at end of file
diff --git a/submarine-cloud-v2/controller.go b/submarine-cloud-v2/controller.go
new file mode 100644
index 0000000..fc5d25d
--- /dev/null
+++ b/submarine-cloud-v2/controller.go
@@ -0,0 +1,185 @@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package main
+
+import (
+       "fmt"
+       "time"
+       corev1 "k8s.io/api/core/v1"
+       "k8s.io/client-go/kubernetes"
+       "k8s.io/client-go/kubernetes/scheme"
+       appsinformers "k8s.io/client-go/informers/apps/v1"
+       utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+       clientset "submarine-cloud-v2/pkg/generated/clientset/versioned"
+       informers 
"submarine-cloud-v2/pkg/generated/informers/externalversions/submarine/v1alpha1"
+       typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
+       "k8s.io/klog/v2"
+       "k8s.io/client-go/util/workqueue"
+       "k8s.io/client-go/tools/cache"
+       "k8s.io/client-go/tools/record"
+       "k8s.io/apimachinery/pkg/util/wait"
+       submarinescheme 
"submarine-cloud-v2/pkg/generated/clientset/versioned/scheme"
+)
+
+const controllerAgentName = "submarine-controller"
+
+// Controller is the controller implementation for Foo resources
+type Controller struct {
+       // kubeclientset is a standard kubernetes clientset
+       kubeclientset kubernetes.Interface
+       // sampleclientset is a clientset for our own API group
+       submarineclientset clientset.Interface
+       
+       submarinesSynced        cache.InformerSynced
+       // workqueue is a rate limited work queue. This is used to queue work 
to be
+       // processed instead of performing it as soon as a change happens. This
+       // means we can ensure we only process a fixed amount of resources at a
+       // time, and makes it easy to ensure we are never processing the same 
item
+       // simultaneously in two different workers.
+       workqueue workqueue.RateLimitingInterface
+       // recorder is an event recorder for recording Event resources to the
+       // Kubernetes API.
+       recorder record.EventRecorder
+}
+
+// NewController returns a new sample controller
+func NewController(
+       kubeclientset kubernetes.Interface,
+       submarineclientset clientset.Interface,
+       deploymentInformer appsinformers.DeploymentInformer,
+       submarineInformer informers.SubmarineInformer) *Controller {
+
+       // TODO: Create event broadcaster
+       // Add Submarine types to the default Kubernetes Scheme so Events can be
+       // logged for Submarine types.
+       utilruntime.Must(submarinescheme.AddToScheme(scheme.Scheme))
+       klog.V(4).Info("Creating event broadcaster")
+       eventBroadcaster := record.NewBroadcaster()
+       eventBroadcaster.StartStructuredLogging(0)
+       
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: 
kubeclientset.CoreV1().Events("")})
+       recorder := eventBroadcaster.NewRecorder(scheme.Scheme, 
corev1.EventSource{Component: controllerAgentName})
+
+
+       // Initialize controller
+       controller := &Controller{
+               kubeclientset:     kubeclientset,
+               submarineclientset:   submarineclientset,
+               submarinesSynced:        submarineInformer.Informer().HasSynced,
+               workqueue:         
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), 
"Submarines"),
+               recorder:          recorder,
+       }
+
+       // Setting up event handler for Submarine
+       klog.Info("Setting up event handlers")
+       
submarineInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+               AddFunc: controller.enqueueSubmarine,
+               UpdateFunc: func(old, new interface{}) {
+                       controller.enqueueSubmarine(new)
+               },
+       })
+
+       // TODO: Setting up event handler for other resources. E.g. namespace
+
+       return controller
+}
+
+func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
+       defer utilruntime.HandleCrash()
+       defer c.workqueue.ShutDown()
+
+       // Start the informer factories to begin populating the informer caches
+       klog.Info("Starting Submarine controller")
+
+       // Wait for the caches to be synced before starting workers
+       klog.Info("Waiting for informer caches to sync")
+       if ok := cache.WaitForCacheSync(stopCh, c.submarinesSynced); !ok {
+               return fmt.Errorf("failed to wait for caches to sync")
+       }
+
+       klog.Info("Starting workers")
+       // Launch two workers to process Submarine resources
+       for i := 0; i < threadiness; i++ {
+               go wait.Until(c.runWorker, time.Second, stopCh)
+       }
+
+       klog.Info("Started workers")
+       <-stopCh
+       klog.Info("Shutting down workers")
+
+       return nil
+}
+
+// runWorker is a long-running function that will continually call the
+// processNextWorkItem function in order to read and process a message on the
+// workqueue.
+func (c *Controller) runWorker() {
+       for c.processNextWorkItem() {
+       }
+}
+
+// processNextWorkItem will read a single work item off the workqueue and
+// attempt to process it, by calling the syncHandler.
+func (c *Controller) processNextWorkItem() bool {
+       obj, shutdown := c.workqueue.Get()
+       if shutdown {
+               return false
+       }
+
+       // We wrap this block in a func so we can defer c.workqueue.Done.
+       err := func(obj interface{}) error {
+               // TODO: Maintain workqueue
+               defer c.workqueue.Done(obj)
+               key, _ := obj.(string)
+               c.syncHandler(key)
+               c.workqueue.Forget(obj)
+               klog.Infof("Successfully synced '%s'", key)
+               return nil
+       }(obj)
+
+       if err != nil {
+               utilruntime.HandleError(err)
+               return true
+       }
+
+       return true
+}
+
+// syncHandler compares the actual state with the desired, and attempts to
+// converge the two. It then updates the Status block of the Foo resource
+// with the current status of the resource.
+func (c *Controller) syncHandler(key string) error {
+       // TODO: business logic
+       klog.Info("syncHandler: ", key)
+       return nil
+}
+
+// enqueueFoo takes a Submarine resource and converts it into a namespace/name
+// string which is then put onto the work queue. This method should *not* be
+// passed resources of any type other than Submarine.
+func (c *Controller) enqueueSubmarine(obj interface{}) {
+       var key string
+       var err error
+       if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
+               utilruntime.HandleError(err)
+               return
+       }
+
+       // key: [namespace]/[CR name]
+       // Example: default/example-submarine 
+       c.workqueue.Add(key)
+}
diff --git a/submarine-cloud-v2/go.sum b/submarine-cloud-v2/go.sum
index 3433a21..6649c0e 100644
--- a/submarine-cloud-v2/go.sum
+++ b/submarine-cloud-v2/go.sum
@@ -83,6 +83,7 @@ github.com/gogo/protobuf v1.3.1/go.mod 
h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod 
h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod 
h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
 github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod 
h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
+github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e 
h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY=
 github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod 
h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
 github.com/golang/mock v1.1.1/go.mod 
h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
 github.com/golang/mock v1.2.0/go.mod 
h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
diff --git a/submarine-cloud-v2/main.go b/submarine-cloud-v2/main.go
index d59d924..b09ebea 100644
--- a/submarine-cloud-v2/main.go
+++ b/submarine-cloud-v2/main.go
@@ -19,8 +19,15 @@ package main
 
 import (
        "flag"
+       "time"
+       kubeinformers "k8s.io/client-go/informers"
        "k8s.io/klog/v2"
+       "k8s.io/client-go/tools/clientcmd"
+       "k8s.io/client-go/kubernetes"
        "os"
+       "submarine-cloud-v2/pkg/signals"
+       clientset "submarine-cloud-v2/pkg/generated/clientset/versioned"
+       informers "submarine-cloud-v2/pkg/generated/informers/externalversions"
 )
 
 var (
@@ -32,7 +39,44 @@ func main() {
        klog.InitFlags(nil)
        flag.Parse()
 
-       // TODO: Create a Submarine operator    
+       // set up signals so we handle the first shutdown signal gracefully
+       stopCh := signals.SetupSignalHandler()
+
+       cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
+       if err != nil {
+               klog.Fatalf("Error building kubeconfig: %s", err.Error())
+       }
+
+       kubeClient, err := kubernetes.NewForConfig(cfg)
+       if err != nil {
+               klog.Fatalf("Error building kubernetes clientset: %s", 
err.Error())
+       }
+
+       submarineClient, err := clientset.NewForConfig(cfg)
+       if err != nil {
+               klog.Fatalf("Error building submarine clientset: %s", 
err.Error())
+       }
+
+       kubeInformerFactory := 
kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
+       submarineInformerFactory := 
informers.NewSharedInformerFactory(submarineClient, time.Second*30)
+       
+       // TODO: Pass informers to NewController()
+       //       ex: namespace informer
+
+       // Create a Submarine operator
+       controller := NewController(kubeClient, submarineClient,
+               kubeInformerFactory.Apps().V1().Deployments(),
+               submarineInformerFactory.Submarine().V1alpha1().Submarines())
+
+       // notice that there is no need to run Start methods in a separate 
goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
+       // Start method is non-blocking and runs all registered informers in a 
dedicated goroutine.
+       kubeInformerFactory.Start(stopCh)
+       submarineInformerFactory.Start(stopCh)
+
+       // Run controller
+       if err = controller.Run(2, stopCh); err != nil {
+               klog.Fatalf("Error running controller: %s", err.Error())
+       }
 }
 
 func init() {
diff --git a/submarine-cloud-v2/main.go 
b/submarine-cloud-v2/pkg/signals/signal.go
similarity index 58%
copy from submarine-cloud-v2/main.go
copy to submarine-cloud-v2/pkg/signals/signal.go
index d59d924..3e783c6 100644
--- a/submarine-cloud-v2/main.go
+++ b/submarine-cloud-v2/pkg/signals/signal.go
@@ -15,27 +15,30 @@
  * limitations under the License.
  */
 
-package main
+package signals
 
 import (
-       "flag"
-       "k8s.io/klog/v2"
        "os"
+       "os/signal"
 )
 
-var (
-       masterURL  string
-       kubeconfig string
-)
+var onlyOneSignalHandler = make(chan struct{})
 
-func main() {
-       klog.InitFlags(nil)
-       flag.Parse()
+// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is 
returned
+// which is closed on one of these signals. If a second signal is caught, the 
program
+// is terminated with exit code 1.
+func SetupSignalHandler() (stopCh <-chan struct{}) {
+       close(onlyOneSignalHandler) // panics when called twice
 
-       // TODO: Create a Submarine operator    
-}
+       stop := make(chan struct{})
+       c := make(chan os.Signal, 2)
+       signal.Notify(c, shutdownSignals...)
+       go func() {
+               <-c
+               close(stop)
+               <-c
+               os.Exit(1) // second signal. Exit directly.
+       }()
 
-func init() {
-       flag.StringVar(&kubeconfig, "kubeconfig", os.Getenv("HOME") + 
"/.kube/config", "Path to a kubeconfig. Only required if out-of-cluster.")
-       flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes 
API server. Overrides any value in kubeconfig. Only required if 
out-of-cluster.")
-}
\ No newline at end of file
+       return stop
+}
diff --git a/submarine-cloud-v2/main.go 
b/submarine-cloud-v2/pkg/signals/signal_posix.go
similarity index 62%
copy from submarine-cloud-v2/main.go
copy to submarine-cloud-v2/pkg/signals/signal_posix.go
index d59d924..763f80c 100644
--- a/submarine-cloud-v2/main.go
+++ b/submarine-cloud-v2/pkg/signals/signal_posix.go
@@ -15,27 +15,11 @@
  * limitations under the License.
  */
 
-package main
+package signals
 
 import (
-       "flag"
-       "k8s.io/klog/v2"
        "os"
+       "syscall"
 )
 
-var (
-       masterURL  string
-       kubeconfig string
-)
-
-func main() {
-       klog.InitFlags(nil)
-       flag.Parse()
-
-       // TODO: Create a Submarine operator    
-}
-
-func init() {
-       flag.StringVar(&kubeconfig, "kubeconfig", os.Getenv("HOME") + 
"/.kube/config", "Path to a kubeconfig. Only required if out-of-cluster.")
-       flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes 
API server. Overrides any value in kubeconfig. Only required if 
out-of-cluster.")
-}
\ No newline at end of file
+var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
diff --git a/submarine-cloud-v2/main.go 
b/submarine-cloud-v2/pkg/signals/signal_windows.go
similarity index 62%
copy from submarine-cloud-v2/main.go
copy to submarine-cloud-v2/pkg/signals/signal_windows.go
index d59d924..08d95fc 100644
--- a/submarine-cloud-v2/main.go
+++ b/submarine-cloud-v2/pkg/signals/signal_windows.go
@@ -15,27 +15,10 @@
  * limitations under the License.
  */
 
-package main
+package signals
 
 import (
-       "flag"
-       "k8s.io/klog/v2"
        "os"
 )
 
-var (
-       masterURL  string
-       kubeconfig string
-)
-
-func main() {
-       klog.InitFlags(nil)
-       flag.Parse()
-
-       // TODO: Create a Submarine operator    
-}
-
-func init() {
-       flag.StringVar(&kubeconfig, "kubeconfig", os.Getenv("HOME") + 
"/.kube/config", "Path to a kubeconfig. Only required if out-of-cluster.")
-       flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes 
API server. Overrides any value in kubeconfig. Only required if 
out-of-cluster.")
-}
\ No newline at end of file
+var shutdownSignals = []os.Signal{os.Interrupt}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to