lburgazzoli closed pull request #86: Pod log implementation
URL: https://github.com/apache/camel-k/pull/86
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/Gopkg.lock b/Gopkg.lock
index bc54a02..823b59c 100644
--- a/Gopkg.lock
+++ b/Gopkg.lock
@@ -736,6 +736,7 @@
     "k8s.io/apimachinery/pkg/runtime/serializer/json",
     "k8s.io/apimachinery/pkg/runtime/serializer/versioning",
     "k8s.io/apimachinery/pkg/util/yaml",
+    "k8s.io/apimachinery/pkg/watch",
     "k8s.io/client-go/plugin/pkg/client/auth/gcp",
     "k8s.io/client-go/rest",
     "k8s.io/client-go/tools/clientcmd",
diff --git a/pkg/client/cmd/run.go b/pkg/client/cmd/run.go
index c4df815..171ed1d 100644
--- a/pkg/client/cmd/run.go
+++ b/pkg/client/cmd/run.go
@@ -32,6 +32,8 @@ import (
        "github.com/spf13/cobra"
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "github.com/apache/camel-k/pkg/util/log"
+       "io"
 )
 
 // NewCmdRun --
@@ -56,6 +58,7 @@ func NewCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command 
{
        cmd.Flags().StringSliceVarP(&options.Properties, "property", "p", nil, 
"Add a camel property")
        cmd.Flags().StringSliceVar(&options.ConfigMaps, "configmap", nil, "Add 
a ConfigMap")
        cmd.Flags().StringSliceVar(&options.Secrets, "secret", nil, "Add a 
Secret")
+       cmd.Flags().BoolVar(&options.Logs, "logs", false, "Print integration 
logs")
 
        return &cmd
 }
@@ -70,6 +73,7 @@ type runCmdOptions struct {
        ConfigMaps         []string
        Secrets            []string
        Wait               bool
+       Logs               bool
 }
 
 func (*runCmdOptions) validateArgs(cmd *cobra.Command, args []string) error {
@@ -96,6 +100,12 @@ func (o *runCmdOptions) run(cmd *cobra.Command, args 
[]string) error {
                        return err
                }
        }
+       if o.Logs {
+               err = o.printLogs(integration)
+               if err != nil {
+                       return err
+               }
+       }
        return nil
 }
 
@@ -137,6 +147,21 @@ watcher:
        return nil
 }
 
+func (o *runCmdOptions) printLogs(integration *v1alpha1.Integration) error {
+       scraper := log.NewSelectorScraper(integration.Namespace, 
"camel.apache.org/integration=" + integration.Name)
+       reader := scraper.Start(o.Context)
+       for {
+               str, err := reader.ReadString('\n')
+               if err == io.EOF || o.Context.Err() != nil {
+                       break
+               } else if err != nil {
+                       return err
+               }
+               fmt.Print(str)
+       }
+       return nil
+}
+
 func (o *runCmdOptions) createIntegration(cmd *cobra.Command, args []string) 
(*v1alpha1.Integration, error) {
        code, err := o.loadCode(args[0])
        if err != nil {
diff --git a/pkg/util/log/annotation_scraper.go 
b/pkg/util/log/annotation_scraper.go
new file mode 100644
index 0000000..2e47b5e
--- /dev/null
+++ b/pkg/util/log/annotation_scraper.go
@@ -0,0 +1,166 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package log
+
+import (
+       "bufio"
+       "io"
+       "context"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "github.com/operator-framework/operator-sdk/pkg/sdk"
+       "k8s.io/api/core/v1"
+       "sync"
+       "sync/atomic"
+       "strconv"
+       "github.com/sirupsen/logrus"
+       "time"
+)
+
+// SelectorScraper scrapes all pods with a given selector
+type SelectorScraper struct {
+       namespace     string
+       labelSelector string
+       podScrapers   sync.Map
+       counter       uint64
+}
+
+// NewSelectorScraper creates a new SelectorScraper
+func NewSelectorScraper(namespace string, labelSelector string) 
*SelectorScraper {
+       return &SelectorScraper{
+               namespace:     namespace,
+               labelSelector: labelSelector,
+       }
+}
+
+// Start returns a reader that streams the log of all selected pods
+func (s *SelectorScraper) Start(ctx context.Context) *bufio.Reader {
+       pipeIn, pipeOut := io.Pipe()
+       bufPipeIn := bufio.NewReader(pipeIn)
+       bufPipeOut := bufio.NewWriter(pipeOut)
+       closeFun := func() error {
+               bufPipeOut.Flush()
+               return pipeOut.Close()
+       }
+       go s.periodicSynchronize(ctx, bufPipeOut, closeFun)
+       return bufPipeIn
+}
+
+func (s *SelectorScraper) periodicSynchronize(ctx context.Context, out 
*bufio.Writer, clientCloser func() error) {
+       err := s.synchronize(ctx, out, clientCloser)
+       if err != nil {
+               logrus.Warn("Could not synchronize log by label " + 
s.labelSelector)
+       }
+       select {
+       case <- ctx.Done():
+               // cleanup
+               s.podScrapers.Range(func(k, v interface{}) bool {
+                       if canc, isCanc := v.(context.CancelFunc); isCanc {
+                               canc()
+                       }
+
+                       return true
+               })
+               clientCloser()
+       case <- time.After(2*time.Second):
+               go s.periodicSynchronize(ctx, out, clientCloser)
+       }
+}
+
+func (s *SelectorScraper) synchronize(ctx context.Context, out *bufio.Writer, 
clientCloser func() error) error {
+       list, err := s.listPods()
+       if err != nil {
+               return err
+       }
+
+       present := make(map[string]bool)
+       for _, pod := range list.Items {
+               present[pod.Name] = true
+               if _, ok := s.podScrapers.Load(pod.Name); !ok {
+                       s.addPodScraper(ctx, pod.Name, out)
+               }
+       }
+
+       toBeRemoved := make(map[string]bool)
+       s.podScrapers.Range(func(k, v interface{}) bool {
+               if str, isStr := k.(string); isStr {
+                       if _, ok := present[str]; !ok {
+                               toBeRemoved[str] = true
+                       }
+               }
+
+               return true
+       })
+
+       for podName := range toBeRemoved {
+               if scr, ok := s.podScrapers.Load(podName); ok {
+                       if canc, ok2 := scr.(context.CancelFunc); ok2 {
+                               canc()
+                               s.podScrapers.Delete(podName)
+                       }
+               }
+       }
+       return nil
+}
+
+func (s *SelectorScraper) addPodScraper(ctx context.Context, name string, out 
*bufio.Writer) {
+       podScraper := NewPodScraper(s.namespace, name)
+       podCtx, podCancel := context.WithCancel(ctx)
+       id := atomic.AddUint64(&s.counter, 1)
+       prefix := "[" + strconv.FormatUint(id, 10) + "] "
+       podReader := podScraper.Start(podCtx)
+       s.podScrapers.Store(name, podCancel)
+       go func() {
+               defer podCancel()
+
+               out.WriteString(prefix + "Monitoring pod " + name)
+               for {
+                       str, err := podReader.ReadString('\n')
+                       if err == io.EOF {
+                               return
+                       } else if err != nil {
+                               logrus.Error("Cannot read from pod stream: ", 
err)
+                               return
+                       }
+                       out.WriteString(prefix + str)
+                       out.Flush()
+                       if podCtx.Err() != nil {
+                               return
+                       }
+               }
+       }()
+
+}
+
+func (s *SelectorScraper) listPods() (*v1.PodList, error) {
+       list := v1.PodList{
+               TypeMeta: metav1.TypeMeta{
+                       Kind:       "Pod",
+                       APIVersion: v1.SchemeGroupVersion.String(),
+               },
+       }
+
+       err := sdk.List(s.namespace, &list, 
sdk.WithListOptions(&metav1.ListOptions{
+               LabelSelector: s.labelSelector,
+       }))
+
+       if err != nil {
+               return nil, err
+       }
+
+       return &list, nil
+}
diff --git a/pkg/util/log/pod_scraper.go b/pkg/util/log/pod_scraper.go
new file mode 100644
index 0000000..798f5b7
--- /dev/null
+++ b/pkg/util/log/pod_scraper.go
@@ -0,0 +1,175 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package log
+
+import (
+       "bufio"
+       "context"
+       "github.com/operator-framework/operator-sdk/pkg/k8sclient"
+       "github.com/operator-framework/operator-sdk/pkg/util/k8sutil"
+       "github.com/pkg/errors"
+       "github.com/sirupsen/logrus"
+       "io"
+       "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+       "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/watch"
+       "time"
+)
+
+// PodScraper scrapes logs of a specific pod
+type PodScraper struct {
+       namespace string
+       name      string
+}
+
+// NewPodScraper creates a new pod scraper
+func NewPodScraper(namespace string, name string) *PodScraper {
+       return &PodScraper{
+               namespace: namespace,
+               name:      name,
+       }
+}
+
+// Start returns a reader that streams the pod logs
+func (s *PodScraper) Start(ctx context.Context) *bufio.Reader {
+       pipeIn, pipeOut := io.Pipe()
+       bufPipeIn := bufio.NewReader(pipeIn)
+       bufPipeOut := bufio.NewWriter(pipeOut)
+       closeFun := func() error {
+               bufPipeOut.Flush()
+               return pipeOut.Close()
+       }
+       go s.doScrape(ctx, bufPipeOut, closeFun)
+       return bufPipeIn
+}
+
+func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, 
clientCloser func() error) {
+       err := s.waitForPodRunning(ctx, s.namespace, s.name)
+       if err != nil {
+               s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
+               return
+       }
+
+       byteReader, err := 
k8sclient.GetKubeClient().CoreV1().Pods(s.namespace).GetLogs(s.name, 
&v1.PodLogOptions{Follow: true}).Context(ctx).Stream()
+       if err != nil {
+               s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
+               return
+       }
+
+       reader := bufio.NewReader(byteReader)
+       err = nil
+       for err == nil {
+               str, err := reader.ReadString('\n')
+               if err != nil {
+                       break
+               }
+               _, err = out.WriteString(str)
+               if err != nil {
+                       break
+               }
+               out.Flush()
+       }
+       if err == io.EOF {
+               return
+       }
+
+       s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
+}
+
+func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait 
time.Duration, out *bufio.Writer, clientCloser func() error) {
+       if err != nil {
+               logrus.Warn(errors.Wrap(err, "error caught during log scraping 
for pod "+s.name))
+       }
+
+       if ctx.Err() != nil {
+               logrus.Info("Pod ", s.name, " will no longer be monitored")
+               clientCloser()
+               return
+       }
+
+       logrus.Info("Retrying to scrape pod ", s.name, " logs in ", 
wait.Seconds(), " seconds...")
+       select {
+       case <-time.After(wait):
+               break
+       case <-ctx.Done():
+               clientCloser()
+               return
+       }
+
+       s.doScrape(ctx, out, clientCloser)
+}
+
+// Waits for a given pod to reach the running state
+func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, 
name string) error {
+       pod := v1.Pod{
+               TypeMeta: metav1.TypeMeta{
+                       Kind:       "Pod",
+                       APIVersion: v1.SchemeGroupVersion.String(),
+               },
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      name,
+                       Namespace: namespace,
+               },
+       }
+       resourceClient, _, err := k8sclient.GetResourceClient(pod.APIVersion, 
pod.Kind, pod.Namespace)
+       if err != nil {
+               return err
+       }
+       watcher, err := resourceClient.Watch(metav1.ListOptions{
+               FieldSelector: "metadata.name=" + pod.Name,
+       })
+       if err != nil {
+               return err
+       }
+       events := watcher.ResultChan()
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case e, ok := <-events:
+                       if !ok {
+                               return errors.New("event channel closed")
+                       }
+
+                       if e.Object != nil {
+                               if runtimeUnstructured, ok := 
e.Object.(runtime.Unstructured); ok {
+                                       unstr := unstructured.Unstructured{
+                                               Object: 
runtimeUnstructured.UnstructuredContent(),
+                                       }
+                                       pcopy := pod.DeepCopy()
+                                       err := 
k8sutil.UnstructuredIntoRuntimeObject(&unstr, pcopy)
+                                       if err != nil {
+                                               return err
+                                       }
+
+                                       if pcopy.Status.Phase == v1.PodRunning {
+                                               return nil
+                                       }
+                               }
+                       } else if e.Type == watch.Deleted || e.Type == 
watch.Error {
+                               return errors.New("unable to watch pod " + 
s.name)
+                       }
+               case <-time.After(30 * time.Second):
+                       return errors.New("no state change after 30 seconds for 
pod " + s.name)
+               }
+       }
+
+       return nil
+}
diff --git a/test/build_manager_integration_test.go 
b/test/build_manager_integration_test.go
index d031fb4..a352009 100644
--- a/test/build_manager_integration_test.go
+++ b/test/build_manager_integration_test.go
@@ -34,7 +34,7 @@ import (
 
 func TestBuildManagerBuild(t *testing.T) {
        ctx := context.TODO()
-       buildManager := build.NewManager(ctx, GetTargetNamespace())
+       buildManager := build.NewManager(ctx, getTargetNamespace())
        identifier := buildapi.BuildIdentifier{
                Name:   "man-test",
                Qualifier: digest.Random(),
@@ -42,7 +42,7 @@ func TestBuildManagerBuild(t *testing.T) {
        buildManager.Start(buildapi.BuildSource{
                Identifier: identifier,
                Code: buildapi.Code{
-                       Content: TimerToLogIntegrationCode(),
+                       Content: createTimerToLogIntegrationCode(),
                },
                Dependencies: []string{
                        "mvn:org.apache.camel/camel-core",
@@ -68,7 +68,7 @@ func TestBuildManagerBuild(t *testing.T) {
 func TestBuildManagerFailedBuild(t *testing.T) {
 
        ctx := context.TODO()
-       buildManager := build.NewManager(ctx, GetTargetNamespace())
+       buildManager := build.NewManager(ctx, getTargetNamespace())
        identifier := buildapi.BuildIdentifier{
                Name:   "man-test-2",
                Qualifier: digest.Random(),
@@ -76,7 +76,7 @@ func TestBuildManagerFailedBuild(t *testing.T) {
        buildManager.Start(buildapi.BuildSource{
                Identifier: identifier,
                Code: buildapi.Code{
-                       Content: TimerToLogIntegrationCode(),
+                       Content: createTimerToLogIntegrationCode(),
                },
                Dependencies: []string{
                        "mvn:org.apache.camel/camel-cippalippa",
diff --git a/test/local_builder_integration_test.go 
b/test/local_builder_integration_test.go
index 79cedeb..61983ce 100644
--- a/test/local_builder_integration_test.go
+++ b/test/local_builder_integration_test.go
@@ -34,7 +34,7 @@ import (
 func TestLocalBuild(t *testing.T) {
 
        ctx := context.TODO()
-       builder := local.NewLocalBuilder(ctx, GetTargetNamespace())
+       builder := local.NewLocalBuilder(ctx, getTargetNamespace())
 
        execution := builder.Build(build.BuildSource{
                Identifier: build.BuildIdentifier{
@@ -42,7 +42,7 @@ func TestLocalBuild(t *testing.T) {
                        Qualifier: digest.Random(),
                },
                Code: build.Code{
-                       Content: TimerToLogIntegrationCode(),
+                       Content: createTimerToLogIntegrationCode(),
                },
        })
 
@@ -54,7 +54,7 @@ func TestLocalBuild(t *testing.T) {
 func TestLocalDoubleBuild(t *testing.T) {
 
        ctx := context.TODO()
-       builder := local.NewLocalBuilder(ctx, GetTargetNamespace())
+       builder := local.NewLocalBuilder(ctx, getTargetNamespace())
 
        execution1 := builder.Build(build.BuildSource{
                Identifier: build.BuildIdentifier{
@@ -62,7 +62,7 @@ func TestLocalDoubleBuild(t *testing.T) {
                        Qualifier: digest.Random(),
                },
                Code: build.Code{
-                       Content: TimerToLogIntegrationCode(),
+                       Content: createTimerToLogIntegrationCode(),
                },
        })
 
@@ -72,7 +72,7 @@ func TestLocalDoubleBuild(t *testing.T) {
                        Qualifier: digest.Random(),
                },
                Code: build.Code{
-                       Content: TimerToLogIntegrationCode(),
+                       Content: createTimerToLogIntegrationCode(),
                },
        })
 
@@ -86,7 +86,7 @@ func TestLocalDoubleBuild(t *testing.T) {
 func TestLocalFailedBuild(t *testing.T) {
 
        ctx := context.TODO()
-       builder := local.NewLocalBuilder(ctx, GetTargetNamespace())
+       builder := local.NewLocalBuilder(ctx, getTargetNamespace())
 
        execution := builder.Build(build.BuildSource{
                Identifier: build.BuildIdentifier{
@@ -94,7 +94,7 @@ func TestLocalFailedBuild(t *testing.T) {
                        Qualifier: digest.Random(),
                },
                Code: build.Code{
-                       Content: TimerToLogIntegrationCode(),
+                       Content: createTimerToLogIntegrationCode(),
                },
                Dependencies: []string{
                        "camel:cippalippa",
diff --git a/test/log_scrape_integration_test.go 
b/test/log_scrape_integration_test.go
new file mode 100644
index 0000000..2052cc5
--- /dev/null
+++ b/test/log_scrape_integration_test.go
@@ -0,0 +1,106 @@
+// +build integration
+
+// To enable compilation of this file in Goland, go to "Settings -> Go -> 
Vendoring & Build Tags -> Custom Tags" and add "integration"
+
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package test
+
+import (
+       "testing"
+       "github.com/apache/camel-k/pkg/util/log"
+       "context"
+       "time"
+       "github.com/stretchr/testify/assert"
+       "strings"
+       "github.com/operator-framework/operator-sdk/pkg/sdk"
+)
+
+func TestPodLogScrape(t *testing.T) {
+       token := "Hello Camel K!"
+       pod, err := createDummyPod("scraped", "/bin/sh", "-c", "for i in `seq 1 
50`; do echo \""+token+"\" && sleep 2; done")
+       defer sdk.Delete(pod)
+       assert.Nil(t, err)
+
+       ctx, _ := context.WithDeadline(context.Background(), 
time.Now().Add(30*time.Second))
+       scraper := log.NewPodScraper(pod.Namespace, pod.Name)
+       in := scraper.Start(ctx)
+
+       res := make(chan bool)
+       go func() {
+               for {
+                       if dl, _ := ctx.Deadline(); time.Now().After(dl) {
+                               return
+                       }
+
+                       str, _ := in.ReadString('\n')
+                       if strings.Contains(str, token) {
+                               res <- true
+                               return
+                       }
+               }
+       }()
+
+       select {
+       case <-res:
+               break
+       case <-time.After(30 * time.Second):
+               assert.Fail(t, "timeout while waiting from token")
+       }
+}
+
+func TestSelectorLogScrape(t *testing.T) {
+       token := "Hello Camel K!"
+       replicas := int32(3)
+       deployment, err := createDummyDeployment("scraped-deployment", 
&replicas, "scrape", "me", "/bin/sh", "-c", "for i in `seq 1 50`; do echo 
\""+token+"\" && sleep 2; done")
+       defer sdk.Delete(deployment)
+       assert.Nil(t, err)
+
+       ctx, _ := context.WithDeadline(context.Background(), 
time.Now().Add(30*time.Second))
+       scraper := log.NewSelectorScraper(deployment.Namespace, "scrape=me")
+       in := scraper.Start(ctx)
+
+       res := make(chan string)
+       go func() {
+               for {
+                       if dl, _ := ctx.Deadline(); time.Now().After(dl) {
+                               return
+                       }
+
+                       str, _ := in.ReadString('\n')
+                       if strings.Contains(str, token) {
+                               res <- str[0:3]
+                       }
+               }
+       }()
+
+       recv := make(map[string]bool)
+loop:
+       for {
+               select {
+               case r := <-res:
+                       recv[r] = true
+                       if len(recv) == 3 {
+                               break loop
+                       }
+               case <-time.After(13 * time.Second):
+                       assert.Fail(t, "timeout while waiting from token")
+                       break loop
+               }
+       }
+}
diff --git a/test/testing_env.go b/test/testing_env.go
index 3b1e215..10fc8e5 100644
--- a/test/testing_env.go
+++ b/test/testing_env.go
@@ -24,6 +24,12 @@ package test
 import (
        "github.com/apache/camel-k/pkg/install"
        "github.com/apache/camel-k/pkg/util/kubernetes"
+       "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "github.com/operator-framework/operator-sdk/pkg/sdk"
+       "time"
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
+       appsv1 "k8s.io/api/apps/v1"
 )
 
 func init() {
@@ -35,13 +41,13 @@ func init() {
                panic(err)
        }
 
-       err = install.Operator(GetTargetNamespace())
+       err = install.Operator(getTargetNamespace())
        if err != nil {
                panic(err)
        }
 }
 
-func GetTargetNamespace() string {
+func getTargetNamespace() string {
        ns, err := kubernetes.GetClientCurrentNamespace("")
        if err != nil {
                panic(err)
@@ -49,7 +55,7 @@ func GetTargetNamespace() string {
        return ns
 }
 
-func TimerToLogIntegrationCode() string {
+func createTimerToLogIntegrationCode() string {
        return `
 import org.apache.camel.builder.RouteBuilder;
 
@@ -64,3 +70,109 @@ public class Routes extends RouteBuilder {
 }
 `
 }
+
+func createDummyDeployment(name string, replicas *int32, labelKey string, 
labelValue string, command ...string) (*appsv1.Deployment, error) {
+       deployment := getDummyDeployment(name, replicas, labelKey, labelValue, 
command...)
+       gracePeriod := int64(0)
+       err := sdk.Delete(&deployment, 
sdk.WithDeleteOptions(&metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}))
+       if err != nil && !k8serrors.IsNotFound(err) {
+               return nil, err
+       }
+       for {
+               list := v1.PodList{
+                       TypeMeta: metav1.TypeMeta{
+                               Kind:       "Pod",
+                               APIVersion: v1.SchemeGroupVersion.String(),
+                       },
+               }
+
+               err := sdk.List(getTargetNamespace(), &list, 
sdk.WithListOptions(&metav1.ListOptions{
+                       LabelSelector: labelKey + "=" + labelValue,
+               }))
+               if err != nil {
+                       return nil, err
+               }
+
+               if len(list.Items) > 0 {
+                       time.Sleep(1*time.Second)
+               } else {
+                       break
+               }
+       }
+       err = sdk.Create(&deployment)
+       return &deployment, err
+}
+
+func getDummyDeployment(name string, replicas *int32, labelKey string, 
labelValue string, command ...string) appsv1.Deployment {
+       return appsv1.Deployment{
+               TypeMeta: metav1.TypeMeta{
+                       Kind:       "Deployment",
+                       APIVersion: appsv1.SchemeGroupVersion.String(),
+               },
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:        name,
+                       Namespace:   getTargetNamespace(),
+               },
+               Spec: appsv1.DeploymentSpec{
+                       Replicas: replicas,
+                       Selector: &metav1.LabelSelector{
+                               MatchLabels: map[string]string{
+                                       labelKey: labelValue,
+                               },
+                       },
+                       Template: v1.PodTemplateSpec{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Labels: map[string]string{
+                                               labelKey: labelValue,
+                                       },
+                               },
+                               Spec: getDummyPod(name, command...).Spec,
+                       },
+               },
+       }
+}
+
+func createDummyPod(name string, command ...string) (*v1.Pod, error) {
+       pod := getDummyPod(name, command...)
+       gracePeriod := int64(0)
+       err := sdk.Delete(&pod, 
sdk.WithDeleteOptions(&metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}))
+       if err != nil && !k8serrors.IsNotFound(err) {
+               return nil, err
+       }
+       for {
+               err := sdk.Create(&pod)
+               if err != nil && k8serrors.IsAlreadyExists(err) {
+                       time.Sleep(1 * time.Second)
+               } else if err != nil {
+                       return nil, err
+               } else {
+                       break
+               }
+       }
+       return &pod, nil
+}
+
+func getDummyPod(name string, command ...string) v1.Pod {
+       gracePeriod := int64(0)
+       return v1.Pod{
+               TypeMeta: metav1.TypeMeta{
+                       Kind:       "Pod",
+                       APIVersion: v1.SchemeGroupVersion.String(),
+               },
+               ObjectMeta: metav1.ObjectMeta{
+                       Namespace: getTargetNamespace(),
+                       Name:      name,
+               },
+               Spec: v1.PodSpec{
+                       TerminationGracePeriodSeconds: &gracePeriod,
+                       Containers: []v1.Container{
+                               {
+                                       Name:    name,
+                                       Image:   "busybox",
+                                       Command: command,
+                               },
+                       },
+               },
+       }
+}
+


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to