This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/master by this push:
new 44f1e40 Pod log implementation
44f1e40 is described below
commit 44f1e40dcf038ada075f57ef073c3f113d7adbcb
Author: nferraro <[email protected]>
AuthorDate: Tue Sep 18 01:32:08 2018 +0200
Pod log implementation
---
Gopkg.lock | 1 +
pkg/client/cmd/run.go | 25 +++++
pkg/util/log/annotation_scraper.go | 166 +++++++++++++++++++++++++++++++
pkg/util/log/pod_scraper.go | 175 +++++++++++++++++++++++++++++++++
test/build_manager_integration_test.go | 8 +-
test/local_builder_integration_test.go | 14 +--
test/log_scrape_integration_test.go | 106 ++++++++++++++++++++
test/testing_env.go | 118 +++++++++++++++++++++-
8 files changed, 599 insertions(+), 14 deletions(-)
diff --git a/Gopkg.lock b/Gopkg.lock
index bc54a02..823b59c 100644
--- a/Gopkg.lock
+++ b/Gopkg.lock
@@ -736,6 +736,7 @@
"k8s.io/apimachinery/pkg/runtime/serializer/json",
"k8s.io/apimachinery/pkg/runtime/serializer/versioning",
"k8s.io/apimachinery/pkg/util/yaml",
+ "k8s.io/apimachinery/pkg/watch",
"k8s.io/client-go/plugin/pkg/client/auth/gcp",
"k8s.io/client-go/rest",
"k8s.io/client-go/tools/clientcmd",
diff --git a/pkg/client/cmd/run.go b/pkg/client/cmd/run.go
index c4df815..171ed1d 100644
--- a/pkg/client/cmd/run.go
+++ b/pkg/client/cmd/run.go
@@ -32,6 +32,8 @@ import (
"github.com/spf13/cobra"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1"
+ "github.com/apache/camel-k/pkg/util/log"
+ "io"
)
// NewCmdRun --
@@ -56,6 +58,7 @@ func NewCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command
{
cmd.Flags().StringSliceVarP(&options.Properties, "property", "p", nil,
"Add a camel property")
cmd.Flags().StringSliceVar(&options.ConfigMaps, "configmap", nil, "Add
a ConfigMap")
cmd.Flags().StringSliceVar(&options.Secrets, "secret", nil, "Add a
Secret")
+ cmd.Flags().BoolVar(&options.Logs, "logs", false, "Print integration
logs")
return &cmd
}
@@ -70,6 +73,7 @@ type runCmdOptions struct {
ConfigMaps []string
Secrets []string
Wait bool
+ Logs bool
}
func (*runCmdOptions) validateArgs(cmd *cobra.Command, args []string) error {
@@ -96,6 +100,12 @@ func (o *runCmdOptions) run(cmd *cobra.Command, args
[]string) error {
return err
}
}
+ if o.Logs {
+ err = o.printLogs(integration)
+ if err != nil {
+ return err
+ }
+ }
return nil
}
@@ -137,6 +147,21 @@ watcher:
return nil
}
+func (o *runCmdOptions) printLogs(integration *v1alpha1.Integration) error {
+ scraper := log.NewSelectorScraper(integration.Namespace,
"camel.apache.org/integration=" + integration.Name)
+ reader := scraper.Start(o.Context)
+ for {
+ str, err := reader.ReadString('\n')
+ if err == io.EOF || o.Context.Err() != nil {
+ break
+ } else if err != nil {
+ return err
+ }
+ fmt.Print(str)
+ }
+ return nil
+}
+
func (o *runCmdOptions) createIntegration(cmd *cobra.Command, args []string)
(*v1alpha1.Integration, error) {
code, err := o.loadCode(args[0])
if err != nil {
diff --git a/pkg/util/log/annotation_scraper.go
b/pkg/util/log/annotation_scraper.go
new file mode 100644
index 0000000..2e47b5e
--- /dev/null
+++ b/pkg/util/log/annotation_scraper.go
@@ -0,0 +1,166 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package log
+
+import (
+ "bufio"
+ "io"
+ "context"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "github.com/operator-framework/operator-sdk/pkg/sdk"
+ "k8s.io/api/core/v1"
+ "sync"
+ "sync/atomic"
+ "strconv"
+ "github.com/sirupsen/logrus"
+ "time"
+)
+
+// SelectorScraper scrapes all pods with a given selector
+type SelectorScraper struct {
+ namespace string
+ labelSelector string
+ podScrapers sync.Map
+ counter uint64
+}
+
+// NewSelectorScraper creates a new SelectorScraper
+func NewSelectorScraper(namespace string, labelSelector string)
*SelectorScraper {
+ return &SelectorScraper{
+ namespace: namespace,
+ labelSelector: labelSelector,
+ }
+}
+
+// Start returns a reader that streams the log of all selected pods
+func (s *SelectorScraper) Start(ctx context.Context) *bufio.Reader {
+ pipeIn, pipeOut := io.Pipe()
+ bufPipeIn := bufio.NewReader(pipeIn)
+ bufPipeOut := bufio.NewWriter(pipeOut)
+ closeFun := func() error {
+ bufPipeOut.Flush()
+ return pipeOut.Close()
+ }
+ go s.periodicSynchronize(ctx, bufPipeOut, closeFun)
+ return bufPipeIn
+}
+
+func (s *SelectorScraper) periodicSynchronize(ctx context.Context, out
*bufio.Writer, clientCloser func() error) {
+ err := s.synchronize(ctx, out, clientCloser)
+ if err != nil {
+ logrus.Warn("Could not synchronize log by label " +
s.labelSelector)
+ }
+ select {
+ case <- ctx.Done():
+ // cleanup
+ s.podScrapers.Range(func(k, v interface{}) bool {
+ if canc, isCanc := v.(context.CancelFunc); isCanc {
+ canc()
+ }
+
+ return true
+ })
+ clientCloser()
+ case <- time.After(2*time.Second):
+ go s.periodicSynchronize(ctx, out, clientCloser)
+ }
+}
+
+func (s *SelectorScraper) synchronize(ctx context.Context, out *bufio.Writer,
clientCloser func() error) error {
+ list, err := s.listPods()
+ if err != nil {
+ return err
+ }
+
+ present := make(map[string]bool)
+ for _, pod := range list.Items {
+ present[pod.Name] = true
+ if _, ok := s.podScrapers.Load(pod.Name); !ok {
+ s.addPodScraper(ctx, pod.Name, out)
+ }
+ }
+
+ toBeRemoved := make(map[string]bool)
+ s.podScrapers.Range(func(k, v interface{}) bool {
+ if str, isStr := k.(string); isStr {
+ if _, ok := present[str]; !ok {
+ toBeRemoved[str] = true
+ }
+ }
+
+ return true
+ })
+
+ for podName := range toBeRemoved {
+ if scr, ok := s.podScrapers.Load(podName); ok {
+ if canc, ok2 := scr.(context.CancelFunc); ok2 {
+ canc()
+ s.podScrapers.Delete(podName)
+ }
+ }
+ }
+ return nil
+}
+
+func (s *SelectorScraper) addPodScraper(ctx context.Context, name string, out
*bufio.Writer) {
+ podScraper := NewPodScraper(s.namespace, name)
+ podCtx, podCancel := context.WithCancel(ctx)
+ id := atomic.AddUint64(&s.counter, 1)
+ prefix := "[" + strconv.FormatUint(id, 10) + "] "
+ podReader := podScraper.Start(podCtx)
+ s.podScrapers.Store(name, podCancel)
+ go func() {
+ defer podCancel()
+
+ out.WriteString(prefix + "Monitoring pod " + name)
+ for {
+ str, err := podReader.ReadString('\n')
+ if err == io.EOF {
+ return
+ } else if err != nil {
+ logrus.Error("Cannot read from pod stream: ",
err)
+ return
+ }
+ out.WriteString(prefix + str)
+ out.Flush()
+ if podCtx.Err() != nil {
+ return
+ }
+ }
+ }()
+
+}
+
+func (s *SelectorScraper) listPods() (*v1.PodList, error) {
+ list := v1.PodList{
+ TypeMeta: metav1.TypeMeta{
+ Kind: "Pod",
+ APIVersion: v1.SchemeGroupVersion.String(),
+ },
+ }
+
+ err := sdk.List(s.namespace, &list,
sdk.WithListOptions(&metav1.ListOptions{
+ LabelSelector: s.labelSelector,
+ }))
+
+ if err != nil {
+ return nil, err
+ }
+
+ return &list, nil
+}
diff --git a/pkg/util/log/pod_scraper.go b/pkg/util/log/pod_scraper.go
new file mode 100644
index 0000000..798f5b7
--- /dev/null
+++ b/pkg/util/log/pod_scraper.go
@@ -0,0 +1,175 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package log
+
+import (
+ "bufio"
+ "context"
+ "github.com/operator-framework/operator-sdk/pkg/k8sclient"
+ "github.com/operator-framework/operator-sdk/pkg/util/k8sutil"
+ "github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
+ "io"
+ "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/watch"
+ "time"
+)
+
+// PodScraper scrapes logs of a specific pod
+type PodScraper struct {
+ namespace string
+ name string
+}
+
+// NewPodScraper creates a new pod scraper
+func NewPodScraper(namespace string, name string) *PodScraper {
+ return &PodScraper{
+ namespace: namespace,
+ name: name,
+ }
+}
+
+// Start returns a reader that streams the pod logs
+func (s *PodScraper) Start(ctx context.Context) *bufio.Reader {
+ pipeIn, pipeOut := io.Pipe()
+ bufPipeIn := bufio.NewReader(pipeIn)
+ bufPipeOut := bufio.NewWriter(pipeOut)
+ closeFun := func() error {
+ bufPipeOut.Flush()
+ return pipeOut.Close()
+ }
+ go s.doScrape(ctx, bufPipeOut, closeFun)
+ return bufPipeIn
+}
+
+func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer,
clientCloser func() error) {
+ err := s.waitForPodRunning(ctx, s.namespace, s.name)
+ if err != nil {
+ s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
+ return
+ }
+
+ byteReader, err :=
k8sclient.GetKubeClient().CoreV1().Pods(s.namespace).GetLogs(s.name,
&v1.PodLogOptions{Follow: true}).Context(ctx).Stream()
+ if err != nil {
+ s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
+ return
+ }
+
+ reader := bufio.NewReader(byteReader)
+ err = nil
+ for err == nil {
+ str, err := reader.ReadString('\n')
+ if err != nil {
+ break
+ }
+ _, err = out.WriteString(str)
+ if err != nil {
+ break
+ }
+ out.Flush()
+ }
+ if err == io.EOF {
+ return
+ }
+
+ s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
+}
+
+func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait
time.Duration, out *bufio.Writer, clientCloser func() error) {
+ if err != nil {
+ logrus.Warn(errors.Wrap(err, "error caught during log scraping
for pod "+s.name))
+ }
+
+ if ctx.Err() != nil {
+ logrus.Info("Pod ", s.name, " will no longer be monitored")
+ clientCloser()
+ return
+ }
+
+ logrus.Info("Retrying to scrape pod ", s.name, " logs in ",
wait.Seconds(), " seconds...")
+ select {
+ case <-time.After(wait):
+ break
+ case <-ctx.Done():
+ clientCloser()
+ return
+ }
+
+ s.doScrape(ctx, out, clientCloser)
+}
+
+// Waits for a given pod to reach the running state
+func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string,
name string) error {
+ pod := v1.Pod{
+ TypeMeta: metav1.TypeMeta{
+ Kind: "Pod",
+ APIVersion: v1.SchemeGroupVersion.String(),
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: namespace,
+ },
+ }
+ resourceClient, _, err := k8sclient.GetResourceClient(pod.APIVersion,
pod.Kind, pod.Namespace)
+ if err != nil {
+ return err
+ }
+ watcher, err := resourceClient.Watch(metav1.ListOptions{
+ FieldSelector: "metadata.name=" + pod.Name,
+ })
+ if err != nil {
+ return err
+ }
+ events := watcher.ResultChan()
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case e, ok := <-events:
+ if !ok {
+ return errors.New("event channel closed")
+ }
+
+ if e.Object != nil {
+ if runtimeUnstructured, ok :=
e.Object.(runtime.Unstructured); ok {
+ unstr := unstructured.Unstructured{
+ Object:
runtimeUnstructured.UnstructuredContent(),
+ }
+ pcopy := pod.DeepCopy()
+ err :=
k8sutil.UnstructuredIntoRuntimeObject(&unstr, pcopy)
+ if err != nil {
+ return err
+ }
+
+ if pcopy.Status.Phase == v1.PodRunning {
+ return nil
+ }
+ }
+ } else if e.Type == watch.Deleted || e.Type ==
watch.Error {
+ return errors.New("unable to watch pod " +
s.name)
+ }
+ case <-time.After(30 * time.Second):
+ return errors.New("no state change after 30 seconds for
pod " + s.name)
+ }
+ }
+
+ return nil
+}
diff --git a/test/build_manager_integration_test.go
b/test/build_manager_integration_test.go
index d031fb4..a352009 100644
--- a/test/build_manager_integration_test.go
+++ b/test/build_manager_integration_test.go
@@ -34,7 +34,7 @@ import (
func TestBuildManagerBuild(t *testing.T) {
ctx := context.TODO()
- buildManager := build.NewManager(ctx, GetTargetNamespace())
+ buildManager := build.NewManager(ctx, getTargetNamespace())
identifier := buildapi.BuildIdentifier{
Name: "man-test",
Qualifier: digest.Random(),
@@ -42,7 +42,7 @@ func TestBuildManagerBuild(t *testing.T) {
buildManager.Start(buildapi.BuildSource{
Identifier: identifier,
Code: buildapi.Code{
- Content: TimerToLogIntegrationCode(),
+ Content: createTimerToLogIntegrationCode(),
},
Dependencies: []string{
"mvn:org.apache.camel/camel-core",
@@ -68,7 +68,7 @@ func TestBuildManagerBuild(t *testing.T) {
func TestBuildManagerFailedBuild(t *testing.T) {
ctx := context.TODO()
- buildManager := build.NewManager(ctx, GetTargetNamespace())
+ buildManager := build.NewManager(ctx, getTargetNamespace())
identifier := buildapi.BuildIdentifier{
Name: "man-test-2",
Qualifier: digest.Random(),
@@ -76,7 +76,7 @@ func TestBuildManagerFailedBuild(t *testing.T) {
buildManager.Start(buildapi.BuildSource{
Identifier: identifier,
Code: buildapi.Code{
- Content: TimerToLogIntegrationCode(),
+ Content: createTimerToLogIntegrationCode(),
},
Dependencies: []string{
"mvn:org.apache.camel/camel-cippalippa",
diff --git a/test/local_builder_integration_test.go
b/test/local_builder_integration_test.go
index 79cedeb..61983ce 100644
--- a/test/local_builder_integration_test.go
+++ b/test/local_builder_integration_test.go
@@ -34,7 +34,7 @@ import (
func TestLocalBuild(t *testing.T) {
ctx := context.TODO()
- builder := local.NewLocalBuilder(ctx, GetTargetNamespace())
+ builder := local.NewLocalBuilder(ctx, getTargetNamespace())
execution := builder.Build(build.BuildSource{
Identifier: build.BuildIdentifier{
@@ -42,7 +42,7 @@ func TestLocalBuild(t *testing.T) {
Qualifier: digest.Random(),
},
Code: build.Code{
- Content: TimerToLogIntegrationCode(),
+ Content: createTimerToLogIntegrationCode(),
},
})
@@ -54,7 +54,7 @@ func TestLocalBuild(t *testing.T) {
func TestLocalDoubleBuild(t *testing.T) {
ctx := context.TODO()
- builder := local.NewLocalBuilder(ctx, GetTargetNamespace())
+ builder := local.NewLocalBuilder(ctx, getTargetNamespace())
execution1 := builder.Build(build.BuildSource{
Identifier: build.BuildIdentifier{
@@ -62,7 +62,7 @@ func TestLocalDoubleBuild(t *testing.T) {
Qualifier: digest.Random(),
},
Code: build.Code{
- Content: TimerToLogIntegrationCode(),
+ Content: createTimerToLogIntegrationCode(),
},
})
@@ -72,7 +72,7 @@ func TestLocalDoubleBuild(t *testing.T) {
Qualifier: digest.Random(),
},
Code: build.Code{
- Content: TimerToLogIntegrationCode(),
+ Content: createTimerToLogIntegrationCode(),
},
})
@@ -86,7 +86,7 @@ func TestLocalDoubleBuild(t *testing.T) {
func TestLocalFailedBuild(t *testing.T) {
ctx := context.TODO()
- builder := local.NewLocalBuilder(ctx, GetTargetNamespace())
+ builder := local.NewLocalBuilder(ctx, getTargetNamespace())
execution := builder.Build(build.BuildSource{
Identifier: build.BuildIdentifier{
@@ -94,7 +94,7 @@ func TestLocalFailedBuild(t *testing.T) {
Qualifier: digest.Random(),
},
Code: build.Code{
- Content: TimerToLogIntegrationCode(),
+ Content: createTimerToLogIntegrationCode(),
},
Dependencies: []string{
"camel:cippalippa",
diff --git a/test/log_scrape_integration_test.go
b/test/log_scrape_integration_test.go
new file mode 100644
index 0000000..2052cc5
--- /dev/null
+++ b/test/log_scrape_integration_test.go
@@ -0,0 +1,106 @@
+// +build integration
+
+// To enable compilation of this file in Goland, go to "Settings -> Go ->
Vendoring & Build Tags -> Custom Tags" and add "integration"
+
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package test
+
+import (
+ "testing"
+ "github.com/apache/camel-k/pkg/util/log"
+ "context"
+ "time"
+ "github.com/stretchr/testify/assert"
+ "strings"
+ "github.com/operator-framework/operator-sdk/pkg/sdk"
+)
+
+func TestPodLogScrape(t *testing.T) {
+ token := "Hello Camel K!"
+ pod, err := createDummyPod("scraped", "/bin/sh", "-c", "for i in `seq 1
50`; do echo \""+token+"\" && sleep 2; done")
+ defer sdk.Delete(pod)
+ assert.Nil(t, err)
+
+ ctx, _ := context.WithDeadline(context.Background(),
time.Now().Add(30*time.Second))
+ scraper := log.NewPodScraper(pod.Namespace, pod.Name)
+ in := scraper.Start(ctx)
+
+ res := make(chan bool)
+ go func() {
+ for {
+ if dl, _ := ctx.Deadline(); time.Now().After(dl) {
+ return
+ }
+
+ str, _ := in.ReadString('\n')
+ if strings.Contains(str, token) {
+ res <- true
+ return
+ }
+ }
+ }()
+
+ select {
+ case <-res:
+ break
+ case <-time.After(30 * time.Second):
+ assert.Fail(t, "timeout while waiting from token")
+ }
+}
+
+func TestSelectorLogScrape(t *testing.T) {
+ token := "Hello Camel K!"
+ replicas := int32(3)
+ deployment, err := createDummyDeployment("scraped-deployment",
&replicas, "scrape", "me", "/bin/sh", "-c", "for i in `seq 1 50`; do echo
\""+token+"\" && sleep 2; done")
+ defer sdk.Delete(deployment)
+ assert.Nil(t, err)
+
+ ctx, _ := context.WithDeadline(context.Background(),
time.Now().Add(30*time.Second))
+ scraper := log.NewSelectorScraper(deployment.Namespace, "scrape=me")
+ in := scraper.Start(ctx)
+
+ res := make(chan string)
+ go func() {
+ for {
+ if dl, _ := ctx.Deadline(); time.Now().After(dl) {
+ return
+ }
+
+ str, _ := in.ReadString('\n')
+ if strings.Contains(str, token) {
+ res <- str[0:3]
+ }
+ }
+ }()
+
+ recv := make(map[string]bool)
+loop:
+ for {
+ select {
+ case r := <-res:
+ recv[r] = true
+ if len(recv) == 3 {
+ break loop
+ }
+ case <-time.After(13 * time.Second):
+ assert.Fail(t, "timeout while waiting from token")
+ break loop
+ }
+ }
+}
diff --git a/test/testing_env.go b/test/testing_env.go
index 3b1e215..10fc8e5 100644
--- a/test/testing_env.go
+++ b/test/testing_env.go
@@ -24,6 +24,12 @@ package test
import (
"github.com/apache/camel-k/pkg/install"
"github.com/apache/camel-k/pkg/util/kubernetes"
+ "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "github.com/operator-framework/operator-sdk/pkg/sdk"
+ "time"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ appsv1 "k8s.io/api/apps/v1"
)
func init() {
@@ -35,13 +41,13 @@ func init() {
panic(err)
}
- err = install.Operator(GetTargetNamespace())
+ err = install.Operator(getTargetNamespace())
if err != nil {
panic(err)
}
}
-func GetTargetNamespace() string {
+func getTargetNamespace() string {
ns, err := kubernetes.GetClientCurrentNamespace("")
if err != nil {
panic(err)
@@ -49,7 +55,7 @@ func GetTargetNamespace() string {
return ns
}
-func TimerToLogIntegrationCode() string {
+func createTimerToLogIntegrationCode() string {
return `
import org.apache.camel.builder.RouteBuilder;
@@ -64,3 +70,109 @@ public class Routes extends RouteBuilder {
}
`
}
+
+func createDummyDeployment(name string, replicas *int32, labelKey string,
labelValue string, command ...string) (*appsv1.Deployment, error) {
+ deployment := getDummyDeployment(name, replicas, labelKey, labelValue,
command...)
+ gracePeriod := int64(0)
+ err := sdk.Delete(&deployment,
sdk.WithDeleteOptions(&metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}))
+ if err != nil && !k8serrors.IsNotFound(err) {
+ return nil, err
+ }
+ for {
+ list := v1.PodList{
+ TypeMeta: metav1.TypeMeta{
+ Kind: "Pod",
+ APIVersion: v1.SchemeGroupVersion.String(),
+ },
+ }
+
+ err := sdk.List(getTargetNamespace(), &list,
sdk.WithListOptions(&metav1.ListOptions{
+ LabelSelector: labelKey + "=" + labelValue,
+ }))
+ if err != nil {
+ return nil, err
+ }
+
+ if len(list.Items) > 0 {
+ time.Sleep(1*time.Second)
+ } else {
+ break
+ }
+ }
+ err = sdk.Create(&deployment)
+ return &deployment, err
+}
+
+func getDummyDeployment(name string, replicas *int32, labelKey string,
labelValue string, command ...string) appsv1.Deployment {
+ return appsv1.Deployment{
+ TypeMeta: metav1.TypeMeta{
+ Kind: "Deployment",
+ APIVersion: appsv1.SchemeGroupVersion.String(),
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: getTargetNamespace(),
+ },
+ Spec: appsv1.DeploymentSpec{
+ Replicas: replicas,
+ Selector: &metav1.LabelSelector{
+ MatchLabels: map[string]string{
+ labelKey: labelValue,
+ },
+ },
+ Template: v1.PodTemplateSpec{
+ ObjectMeta: metav1.ObjectMeta{
+ Labels: map[string]string{
+ labelKey: labelValue,
+ },
+ },
+ Spec: getDummyPod(name, command...).Spec,
+ },
+ },
+ }
+}
+
+func createDummyPod(name string, command ...string) (*v1.Pod, error) {
+ pod := getDummyPod(name, command...)
+ gracePeriod := int64(0)
+ err := sdk.Delete(&pod,
sdk.WithDeleteOptions(&metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}))
+ if err != nil && !k8serrors.IsNotFound(err) {
+ return nil, err
+ }
+ for {
+ err := sdk.Create(&pod)
+ if err != nil && k8serrors.IsAlreadyExists(err) {
+ time.Sleep(1 * time.Second)
+ } else if err != nil {
+ return nil, err
+ } else {
+ break
+ }
+ }
+ return &pod, nil
+}
+
+func getDummyPod(name string, command ...string) v1.Pod {
+ gracePeriod := int64(0)
+ return v1.Pod{
+ TypeMeta: metav1.TypeMeta{
+ Kind: "Pod",
+ APIVersion: v1.SchemeGroupVersion.String(),
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: getTargetNamespace(),
+ Name: name,
+ },
+ Spec: v1.PodSpec{
+ TerminationGracePeriodSeconds: &gracePeriod,
+ Containers: []v1.Container{
+ {
+ Name: name,
+ Image: "busybox",
+ Command: command,
+ },
+ },
+ },
+ }
+}
+