This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 4caea26e68f8b0c90dbd8f79b0cc2aef0b577d40 Author: lburgazzoli <[email protected]> AuthorDate: Fri Oct 12 16:29:59 2018 +0200 chore(kamel): simplify integration status watcher --- cmd/kamel/kamel.go | 7 +++++- pkg/client/cmd/run.go | 50 +++++++++++++----------------------- pkg/util/util.go | 21 ++++++++++++++++ pkg/util/watch/watch.go | 67 +++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 112 insertions(+), 33 deletions(-) diff --git a/cmd/kamel/kamel.go b/cmd/kamel/kamel.go index a89042f..502a978 100644 --- a/cmd/kamel/kamel.go +++ b/cmd/kamel/kamel.go @@ -31,7 +31,11 @@ import ( func main() { rand.Seed(time.Now().UTC().UnixNano()) - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + + // Cancel ctx as soon as main returns + defer cancel() + rootCmd, err := cmd.NewKamelCommand(ctx) exitOnError(err) @@ -42,6 +46,7 @@ func main() { func exitOnError(err error) { if err != nil { fmt.Println("Error:", err) + os.Exit(1) } } diff --git a/pkg/client/cmd/run.go b/pkg/client/cmd/run.go index 5b6664d..a22b58f 100644 --- a/pkg/client/cmd/run.go +++ b/pkg/client/cmd/run.go @@ -19,6 +19,7 @@ package cmd import ( "fmt" + "io" "io/ioutil" "net/http" "os" @@ -34,8 +35,6 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" - "io" - "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" "github.com/apache/camel-k/pkg/util/kubernetes" "github.com/apache/camel-k/pkg/util/log" @@ -165,41 +164,28 @@ func (o *runCmdOptions) run(cmd *cobra.Command, args []string) error { } func (o *runCmdOptions) waitForIntegrationReady(integration *v1alpha1.Integration) error { - // Block this goroutine until the integration is in a final status - changes, err := watch.StateChanges(o.Context, integration) - if err != nil { - return err - } - - var lastStatusSeen *v1alpha1.IntegrationStatus - -watcher: - for { - select { - case <-o.Context.Done(): - return nil - case i, ok := <-changes: - if !ok { - break watcher + handler := func(i *v1alpha1.Integration) bool { + // + // TODO when we add health checks, we should wait until they are passed + // + if i.Status.Phase != "" { + fmt.Println("integration \""+integration.Name+"\" in phase", i.Status.Phase) + + if i.Status.Phase == v1alpha1.IntegrationPhaseRunning { + // TODO display some error info when available in the status + return false } - lastStatusSeen = &i.Status - phase := string(i.Status.Phase) - if phase != "" { - fmt.Println("integration \""+integration.Name+"\" in phase", phase) - // TODO when we add health checks, we should wait until they are passed - if i.Status.Phase == v1alpha1.IntegrationPhaseRunning || i.Status.Phase == v1alpha1.IntegrationPhaseError { - // TODO display some error info when available in the status - break watcher - } + + if i.Status.Phase == v1alpha1.IntegrationPhaseError { + fmt.Println("integration deployment failed") + return false } } - } - // TODO we may not be able to reach this state, since the build will be done without sources (until we add health checks) - if lastStatusSeen != nil && lastStatusSeen.Phase == v1alpha1.IntegrationPhaseError { - return errors.New("integration deployment failed") + return true } - return nil + + return watch.HandleStateChanges(o.Context, integration, handler) } func (o *runCmdOptions) printLogs(integration *v1alpha1.Integration) error { diff --git a/pkg/util/util.go b/pkg/util/util.go index 7792210..dcfbd36 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -17,6 +17,12 @@ limitations under the License. package util +import ( + "os" + "os/signal" + "syscall" +) + // StringSliceContains -- func StringSliceContains(slice []string, items []string) bool { for i := 0; i < len(items); i++ { @@ -51,3 +57,18 @@ func StringSliceUniqueAdd(slice *[]string, item string) bool { return true } + +// WaitForSignal -- +func WaitForSignal(sig chan os.Signal, exit func(int)) { + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.SIGPIPE) + go func() { + s := <-sig + switch s { + case syscall.SIGINT, syscall.SIGTERM: + exit(130) // Ctrl+c + case syscall.SIGPIPE: + exit(0) + } + exit(1) + }() +} diff --git a/pkg/util/watch/watch.go b/pkg/util/watch/watch.go index 1ed9671..73d865e 100644 --- a/pkg/util/watch/watch.go +++ b/pkg/util/watch/watch.go @@ -47,7 +47,9 @@ func StateChanges(ctx context.Context, integration *v1alpha1.Integration) (<-cha var lastObservedState *v1alpha1.IntegrationPhase go func() { + defer watcher.Stop() defer close(out) + for { select { case <-ctx.Done(): @@ -81,3 +83,68 @@ func StateChanges(ctx context.Context, integration *v1alpha1.Integration) (<-cha return out, nil } + +// +// HandleStateChanges watches a integration resource and invoke the given handler when its status changes. +// +// err := watch.HandleStateChanges(ctx, integration, func(i *v1alpha1.Integration) bool { +// if i.Status.Phase == v1alpha1.IntegrationPhaseRunning { +// return false +// } +// +// return true +// }) +// +// This function blocks until the handler function returns true or either the events channel or the context is closed. +// +func HandleStateChanges(ctx context.Context, integration *v1alpha1.Integration, handler func(integration *v1alpha1.Integration) bool) error { + resourceClient, _, err := k8sclient.GetResourceClient(integration.APIVersion, integration.Kind, integration.Namespace) + if err != nil { + return err + } + watcher, err := resourceClient.Watch(metav1.ListOptions{ + FieldSelector: "metadata.name=" + integration.Name, + }) + if err != nil { + return err + } + + defer watcher.Stop() + events := watcher.ResultChan() + + var lastObservedState *v1alpha1.IntegrationPhase + + for { + select { + case <-ctx.Done(): + return nil + case e, ok := <-events: + if !ok { + return nil + } + + if e.Object != nil { + if runtimeUnstructured, ok := e.Object.(runtime.Unstructured); ok { + unstr := unstructured.Unstructured{ + Object: runtimeUnstructured.UnstructuredContent(), + } + icopy := integration.DeepCopy() + err := k8sutil.UnstructuredIntoRuntimeObject(&unstr, icopy) + if err != nil { + logrus.Error("Unexpected error detected when watching resource", err) + return nil + } + + if lastObservedState == nil || *lastObservedState != icopy.Status.Phase { + lastObservedState = &icopy.Status.Phase + if !handler(icopy) { + return nil + } + } + } + } + } + } + + return nil +}
