This is an automated email from the ASF dual-hosted git repository.

kaihsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new d34e523  SUBMARINE-824. Execute port-forwarding automatically with 
Golang out-of-cluster
d34e523 is described below

commit d34e523cf1fd600e70e504e94db9a5af9978f2e0
Author: Kai-Hsun Chen <[email protected]>
AuthorDate: Tue May 18 00:35:58 2021 +0800

    SUBMARINE-824. Execute port-forwarding automatically with Golang 
out-of-cluster
    
    ### What is this PR for?
    Execute the following command via Golang out-of-cluster
    ```
    kubectl port-forward --address 0.0.0.0 -n ${Your_Namespace} service/traefik 
32080:80
    ```
    
    Reference:
    (1) https://gianarb.it/blog/programmatically-kube-port-forward-in-go
    (2) https://github.com/minio/operator/blob/master/kubectl-minio/cmd/proxy.go
    
    ### What type of PR is it?
    [Feature]
    
    ### Todos
    * Execute port-forwarding automatically with Golang in-cluster
         *  The function `k8sutil.ServicePortForwardPort(context.TODO(), 
namespace, "traefik", 32080, 80, color.FgGreen)` does not work in-cluster.
         * (In submarine-operator pod) `curl 127.0.0.1:32080` --> Yes
         * (Local Chrome browser) 127.0.0.1:32080 --> 404
         * We need to forward the request from 127.0.0.1:32080 to the 
submarine-operator pod. (Use k8s NodePort to implement)
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/SUBMARINE-824
    
    ### How should this be tested?
    
https://user-images.githubusercontent.com/20109646/118227081-86001080-b4ba-11eb-9274-6a053954f5f7.mov
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Do the license files need updating? No
    * Are there breaking changes for older versions? No
    * Does this need new documentation? No
    
    Author: Kai-Hsun Chen <[email protected]>
    
    Signed-off-by: Kai-Hsun Chen <[email protected]>
    
    Closes #588 from kevin85421/SUBMARINE-824 and squashes the following 
commits:
    
    18d88a13 [Kai-Hsun Chen] Add incluster to Controller struct
    782354d6 [Kai-Hsun Chen] Kill port-forward process
    c7a8db9d [Kai-Hsun Chen] Update README.md
    92f768ea [Kai-Hsun Chen] SUBMARINE-824. Execute port-forwarding 
automatically with Golang out-of-cluster
    0266af38 [Kai-Hsun Chen] SUBMARINE-824. Execute port-forwarding 
automatically with Golang
---
 submarine-cloud-v2/Dockerfile                      |   6 +-
 submarine-cloud-v2/README.md                       |  44 +++++++--
 .../submarine-operator-service-account.yaml        |   1 +
 submarine-cloud-v2/controller.go                   |  41 ++++++--
 submarine-cloud-v2/main.go                         |   2 +-
 submarine-cloud-v2/pkg/k8sutil/portfwd.go          | 107 +++++++++++++++++++++
 6 files changed, 180 insertions(+), 21 deletions(-)

diff --git a/submarine-cloud-v2/Dockerfile b/submarine-cloud-v2/Dockerfile
index a4097aa..9d2dc21 100644
--- a/submarine-cloud-v2/Dockerfile
+++ b/submarine-cloud-v2/Dockerfile
@@ -19,7 +19,7 @@ MAINTAINER Apache Software Foundation 
<[email protected]>
 WORKDIR /usr/src
 
 RUN apt-get update &&\
-    apt-get install -y wget vim git
+    apt-get install -y wget vim git curl
 
 ENV GOROOT="/usr/local/go"
 ENV GOPATH=$HOME/gocode
@@ -29,6 +29,10 @@ ENV PATH=$PATH:$GOPATH:$GOBIN:$GOROOT/bin
 RUN wget https://dl.google.com/go/go1.16.2.linux-amd64.tar.gz &&\
     tar -C /usr/local -xzf go1.16.2.linux-amd64.tar.gz
 
+RUN curl -LO https://dl.k8s.io/release/v1.14.2/bin/linux/amd64/kubectl &&\
+    install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl &&\
+    kubectl version --client
+
 ADD charts/ /usr/src/charts
 
 ADD submarine-operator /usr/src
diff --git a/submarine-cloud-v2/README.md b/submarine-cloud-v2/README.md
index ace7f70..6744532 100644
--- a/submarine-cloud-v2/README.md
+++ b/submarine-cloud-v2/README.md
@@ -48,8 +48,24 @@ make test-unit
 
 # Run submarine-operator out-of-cluster
 ```bash
+# Step1: Build & Run "submarine-operator"
 go build -o submarine-operator
 ./submarine-operator
+
+# Step2: Deploy a submarine
+kubectl create ns submarine-admin
+kubectl apply -n submarine-admin -f artifacts/examples/example-submarine.yaml
+
+# Step3: "submarine-operator" will perform port-forwarding automatically.
+
+# Step4: View workbench (127.0.0.1:32080) with your web browser
+
+# Step5: Delete: 
+#   (1) Remove all relevant Helm chart releases
+#   (2) Remove all resources in the namespace "submariner-user-test"
+#   (3) Remove all non-namespaced resources (Ex: PersistentVolume) created by 
client-go API 
+#   (4) **Note:** The namespace "submarine-admin" will not be deleted
+kubectl delete submarine example-submarine -n submarine-admin 
 ```
 
 # Run operator in-cluster
@@ -65,19 +81,29 @@ kubectl apply -f 
artifacts/examples/submarine-operator-service-account.yaml
 kubectl apply -f artifacts/examples/submarine-operator.yaml
 
 # Step4: Deploy a submarine
-kubectl create ns submarine-operator-test
-kubectl apply -n submarine-operator-test -f 
artifacts/examples/example-submarine.yaml
+kubectl create ns submarine-admin
+kubectl apply -n submarine-admin -f artifacts/examples/example-submarine.yaml
 
 # Step5: Inspect submarine-operator POD logs 
-kubectl logs ${submarine-operator POD}
-```
+kubectl logs -f ${submarine-operator POD}
 
-# Create a Submarine in specific namespace and see workbench
+# Step6: The operator will create a new namespace "submarine-user-test"
+kubectl get all -n submarine-user-test 
 
-```bash
-kubectl create ns submarine-operator-test
-kubectl apply -n submarine-operator-test -f 
artifacts/examples/example-submarine.yaml
-kubectl port-forward --address 0.0.0.0 -n submarine-operator-test 
service/traefik 32080:80
+# Step7: port-forwarding
+kubectl port-forward --address 0.0.0.0 -n submarine-user-test service/traefik 
32080:80
+
+# Step8: View workbench (127.0.0.1:32080) with your web browser
+
+# Step9: Delete: 
+#   (1) Remove all relevant Helm chart releases
+#   (2) Remove all resources in the namespace "submariner-user-test"
+#   (3) Remove all non-namespaced resources (Ex: PersistentVolume) created by 
client-go API 
+#   (4) **Note:** The namespace "submarine-admin" will not be deleted
+kubectl delete submarine example-submarine -n submarine-admin 
+
+# Step10: Delete "submarine-operator"
+kubectl delete deployment submarine-operator-demo
 ```
 
 # Helm Golang API
diff --git 
a/submarine-cloud-v2/artifacts/examples/submarine-operator-service-account.yaml 
b/submarine-cloud-v2/artifacts/examples/submarine-operator-service-account.yaml
index bc7771c..4eafbe0 100644
--- 
a/submarine-cloud-v2/artifacts/examples/submarine-operator-service-account.yaml
+++ 
b/submarine-cloud-v2/artifacts/examples/submarine-operator-service-account.yaml
@@ -58,6 +58,7 @@ rules:
       - serviceaccounts
       - persistentvolumes
       - persistentvolumeclaims
+      - pods/portforward
     verbs:
       - "*"
   - apiGroups:
diff --git a/submarine-cloud-v2/controller.go b/submarine-cloud-v2/controller.go
index 863de53..d90e0f5 100644
--- a/submarine-cloud-v2/controller.go
+++ b/submarine-cloud-v2/controller.go
@@ -21,15 +21,16 @@ import (
        "context"
        "encoding/json"
        "fmt"
+
        clientset "submarine-cloud-v2/pkg/generated/clientset/versioned"
        submarinescheme 
"submarine-cloud-v2/pkg/generated/clientset/versioned/scheme"
        informers 
"submarine-cloud-v2/pkg/generated/informers/externalversions/submarine/v1alpha1"
        listers "submarine-cloud-v2/pkg/generated/listers/submarine/v1alpha1"
+       "submarine-cloud-v2/pkg/helm"
+       "submarine-cloud-v2/pkg/k8sutil"
        v1alpha1 "submarine-cloud-v2/pkg/submarine/v1alpha1"
        "time"
 
-       "submarine-cloud-v2/pkg/helm"
-
        appsv1 "k8s.io/api/apps/v1"
        corev1 "k8s.io/api/core/v1"
        extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
@@ -60,6 +61,9 @@ import (
        traefikinformers 
"github.com/traefik/traefik/v2/pkg/provider/kubernetes/crd/generated/informers/externalversions/traefik/v1alpha1"
        traefiklisters 
"github.com/traefik/traefik/v2/pkg/provider/kubernetes/crd/generated/listers/traefik/v1alpha1"
        traefikv1alpha1 
"github.com/traefik/traefik/v2/pkg/provider/kubernetes/crd/traefik/v1alpha1"
+
+       "github.com/fatih/color"
+       "os/exec"
 )
 
 const controllerAgentName = "submarine-controller"
@@ -96,7 +100,9 @@ type Controller struct {
 
        // TODO: Need to be modified to implement multi-tenant
        // Store charts
-       charts []helm.HelmUninstallInfo
+       charts     []helm.HelmUninstallInfo
+       portfwdCmd *exec.Cmd
+       incluster  bool
 }
 
 const (
@@ -112,6 +118,7 @@ type WorkQueueItem struct {
 
 // NewController returns a new sample controller
 func NewController(
+       incluster bool,
        kubeclientset kubernetes.Interface,
        submarineclientset clientset.Interface,
        traefikclientset traefik.Interface,
@@ -154,6 +161,8 @@ func NewController(
                clusterrolebindingLister:    
clusterrolebindingInformer.Lister(),
                workqueue:                   
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), 
"Submarines"),
                recorder:                    recorder,
+               portfwdCmd:                  nil,
+               incluster:                   incluster,
        }
 
        // Setting up event handler for Submarine
@@ -794,8 +803,6 @@ func (c *Controller) newSubCharts(namespace string) error {
 
        // TODO: maintain "error"
        // TODO: (sample-controller) controller.go:287 ~ 293
-       // TODO: port-forward
-       //              kubectl port-forward --address 0.0.0.0 service/traefik 
32080:80
 
        return nil
 }
@@ -1134,6 +1141,17 @@ func (c *Controller) syncHandler(workqueueItem 
WorkQueueItem) error {
                if err != nil {
                        return err
                }
+
+               // Port-forwarding
+               // TODO:
+               //   (1) multi-tenant port-forwarding
+               //   (2) Basic operations: on/off/modify (change port)
+               //   (3) in-cluster
+               if action == ADD {
+                       if !c.incluster {
+                               c.portfwdCmd = 
k8sutil.ServicePortForwardPort(context.TODO(), newNamespace, "traefik", 32080, 
80, color.FgGreen)
+                       }
+               }
        } else { // Case: DELETE
                // Uninstall Helm charts
                for _, chart := range c.charts {
@@ -1148,11 +1166,6 @@ func (c *Controller) syncHandler(workqueueItem 
WorkQueueItem) error {
                }
                klog.Info("Delete Namespace: ", newNamespace)
 
-               err = 
c.kubeclientset.CoreV1().Namespaces().Delete(context.TODO(), namespace, 
metav1.DeleteOptions{})
-               if err != nil {
-                       return err
-               }
-
                // Delete non-namespaced resources (ex: PersistentVolume)
                err = 
c.kubeclientset.CoreV1().PersistentVolumes().Delete(context.TODO(), 
"submarine-database-pv--"+newNamespace, metav1.DeleteOptions{})
                if err != nil {
@@ -1163,6 +1176,14 @@ func (c *Controller) syncHandler(workqueueItem 
WorkQueueItem) error {
                if err != nil {
                        return err
                }
+
+               // Kill port-forward process:
+               if !c.incluster {
+                       err = c.portfwdCmd.Process.Kill()
+                       if err != nil {
+                               return err
+                       }
+               }
        }
 
        return nil
diff --git a/submarine-cloud-v2/main.go b/submarine-cloud-v2/main.go
index 1f1d1fb..4d7c8a2 100644
--- a/submarine-cloud-v2/main.go
+++ b/submarine-cloud-v2/main.go
@@ -84,7 +84,7 @@ func main() {
        //       ex: namespace informer
 
        // Create a Submarine operator
-       controller := NewController(kubeClient, submarineClient, traefikClient,
+       controller := NewController(incluster, kubeClient, submarineClient, 
traefikClient,
                kubeInformerFactory.Apps().V1().Deployments(),
                kubeInformerFactory.Core().V1().Services(),
                kubeInformerFactory.Core().V1().ServiceAccounts(),
diff --git a/submarine-cloud-v2/pkg/k8sutil/portfwd.go 
b/submarine-cloud-v2/pkg/k8sutil/portfwd.go
new file mode 100644
index 0000000..98ff136
--- /dev/null
+++ b/submarine-cloud-v2/pkg/k8sutil/portfwd.go
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Reference: 
https://github.com/minio/operator/blob/master/kubectl-minio/cmd/proxy.go#L131
+
+package k8sutil
+
+import (
+       "context"
+       "fmt"
+       "github.com/fatih/color"
+       "io"
+       "log"
+       "os/exec"
+       "strconv"
+       "sync"
+)
+
+// run the command inside a goroutine, return a channel that closes then the 
command dies
+func ServicePortForwardPort(ctx context.Context, namespace string, service 
string, localPort int, remotePort int, dcolor color.Attribute) *exec.Cmd {
+       // service we are going to forward
+       serviceName := fmt.Sprintf("service/%s", service)
+       // command to run
+       portStr := strconv.Itoa(localPort) + ":" + strconv.Itoa(remotePort)
+       cmd := exec.CommandContext(ctx, "kubectl", "port-forward", "--address", 
"0.0.0.0", "-n", namespace, serviceName, portStr)
+
+       go func(cmd *exec.Cmd) {
+               // prepare to capture the output
+               var errStdout, errStderr error
+               stdoutIn, _ := cmd.StdoutPipe()
+               stderrIn, _ := cmd.StderrPipe()
+               err := cmd.Start()
+               if err != nil {
+                       log.Fatalf("cmd.Start() failed with '%s'\n", err)
+               }
+
+               // cmd.Wait() should be called only after we finish reading
+               // from stdoutIn and stderrIn.
+               // wg ensures that we finish
+               var wg sync.WaitGroup
+               wg.Add(1)
+               go func() {
+                       errStdout = copyAndCapture(stdoutIn, dcolor)
+                       wg.Done()
+               }()
+
+               errStderr = copyAndCapture(stderrIn, dcolor)
+
+               wg.Wait()
+
+               err = cmd.Wait()
+               if err != nil {
+                       if err.Error() == "signal: killed" {
+                               log.Printf("Stop port-forward\n")
+                       } else {
+                               log.Printf("cmd.Run() failed with %s\n", 
err.Error())
+                       }
+                       return
+               }
+               if errStdout != nil || errStderr != nil {
+                       log.Printf("failed to capture stdout or stderr\n")
+                       return
+               }
+       }(cmd)
+
+       return cmd
+}
+
+// capture and print the output of the command
+func copyAndCapture(r io.Reader, dcolor color.Attribute) error {
+       var out []byte
+       buf := make([]byte, 1024)
+       for {
+               n, err := r.Read(buf[:])
+               if n > 0 {
+                       d := buf[:n]
+                       out = append(out, d...)
+                       theColor := color.New(dcolor)
+                       _, err := theColor.Print(string(d))
+
+                       if err != nil {
+                               return err
+                       }
+               }
+               if err != nil {
+                       // Read returns io.EOF at the end of file, which is not 
an error for us
+                       if err == io.EOF {
+                               err = nil
+                       }
+                       return err
+               }
+       }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to