This is an automated email from the ASF dual-hosted git repository.

astefanutti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 48525daa7028d7fc298463a3649539faae1cfc26
Author: Antonin Stefanutti <[email protected]>
AuthorDate: Mon Aug 30 17:21:13 2021 +0200

    feat(native): Automatic rollout to highest priority kit
---
 pkg/controller/integration/build_kit.go            | 216 +--------------------
 pkg/controller/integration/build_kit_test.go       |   8 +-
 .../integration/integration_controller.go          |  66 ++++---
 .../integration/{build_kit.go => kits.go}          | 167 ++--------------
 pkg/controller/integration/monitor.go              |  55 +++++-
 5 files changed, 112 insertions(+), 400 deletions(-)

diff --git a/pkg/controller/integration/build_kit.go 
b/pkg/controller/integration/build_kit.go
index ff396dd..a0fee46 100644
--- a/pkg/controller/integration/build_kit.go
+++ b/pkg/controller/integration/build_kit.go
@@ -19,19 +19,10 @@ package integration
 
 import (
        "context"
-       "encoding/json"
-       "reflect"
 
        "github.com/pkg/errors"
 
-       k8serrors "k8s.io/apimachinery/pkg/api/errors"
-       "k8s.io/apimachinery/pkg/labels"
-       "k8s.io/apimachinery/pkg/selection"
-
-       ctrl "sigs.k8s.io/controller-runtime/pkg/client"
-
        v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
-       "github.com/apache/camel-k/pkg/platform"
        "github.com/apache/camel-k/pkg/trait"
        "github.com/apache/camel-k/pkg/util"
        "github.com/apache/camel-k/pkg/util/defaults"
@@ -64,7 +55,7 @@ func (action *buildKitAction) Handle(ctx context.Context, 
integration *v1.Integr
                }
 
                if kit.Labels[v1.IntegrationKitTypeLabel] == 
v1.IntegrationKitTypePlatform {
-                       match, err := action.integrationMatches(integration, 
kit)
+                       match, err := integrationMatches(integration, kit)
                        if err != nil {
                                return nil, err
                        } else if !match {
@@ -91,7 +82,7 @@ func (action *buildKitAction) Handle(ctx context.Context, 
integration *v1.Integr
                return nil, nil
        }
 
-       existingKits, err := action.lookupKitsForIntegration(ctx, 
action.client, integration)
+       existingKits, err := lookupKitsForIntegration(ctx, action.client, 
integration)
        if err != nil {
                return nil, err
        }
@@ -138,79 +129,6 @@ kits:
        return integration, nil
 }
 
-func (action *buildKitAction) lookupKitsForIntegration(ctx context.Context, c 
ctrl.Reader, integration *v1.Integration) ([]v1.IntegrationKit, error) {
-       pl, err := platform.GetCurrent(ctx, c, integration.Namespace)
-       if err != nil && !k8serrors.IsNotFound(err) {
-               return nil, err
-       }
-
-       kitTypes, err := labels.NewRequirement(v1.IntegrationKitTypeLabel, 
selection.In, []string{
-               v1.IntegrationKitTypePlatform,
-               v1.IntegrationKitTypeExternal,
-       })
-       if err != nil {
-               return nil, err
-       }
-
-       options := []ctrl.ListOption{
-               ctrl.InNamespace(integration.GetIntegrationKitNamespace(pl)),
-               ctrl.MatchingLabels{
-                       "camel.apache.org/runtime.version":  
integration.Status.RuntimeVersion,
-                       "camel.apache.org/runtime.provider": 
string(integration.Status.RuntimeProvider),
-               },
-               ctrl.MatchingLabelsSelector{
-                       Selector: labels.NewSelector().Add(*kitTypes),
-               },
-       }
-
-       list := v1.NewIntegrationKitList()
-       if err := c.List(ctx, &list, options...); err != nil {
-               return nil, err
-       }
-
-       kits := make([]v1.IntegrationKit, 0)
-       for _, kit := range list.Items {
-               match, err := action.integrationMatches(integration, &kit)
-               if err != nil {
-                       return nil, err
-               } else if !match {
-                       continue
-               }
-               kits = append(kits, kit)
-       }
-
-       return kits, nil
-}
-
-// integrationMatches returns whether the v1.IntegrationKit meets the 
requirements of the v1.Integration
-func (action *buildKitAction) integrationMatches(integration *v1.Integration, 
kit *v1.IntegrationKit) (bool, error) {
-       if kit.Status.Phase == v1.IntegrationKitPhaseError {
-               return false, nil
-       }
-       if kit.Status.Version != integration.Status.Version {
-               return false, nil
-       }
-       if len(integration.Status.Dependencies) != len(kit.Spec.Dependencies) {
-               return false, nil
-       }
-       // When a platform kit is created it inherits the traits from the 
integrations and as
-       // some traits may influence the build thus the artifacts present on 
the container image,
-       // we need to take traits into account when looking up for compatible 
kits.
-       //
-       // It could also happen that an integration is updated and a trait is 
modified, if we do
-       // not include traits in the lookup, we may use a kit that does not 
have all the
-       // characteristics required by the integration.
-       //
-       // A kit can be used only if it contains a subset of the traits and 
related configurations
-       // declared on integration.
-       if match, err := action.hasMatchingTraits(integration, kit); !match || 
err != nil {
-               return false, err
-       }
-       if !util.StringSliceContains(kit.Spec.Dependencies, 
integration.Status.Dependencies) {
-               return false, nil
-       }
-       return true, nil
-}
 
 // kitMatches returns whether the v1.IntegrationKit match
 func (action *buildKitAction) kitMatches(k1 *v1.IntegrationKit, k2 
*v1.IntegrationKit) (bool, error) {
@@ -233,7 +151,7 @@ func (action *buildKitAction) kitMatches(k1 
*v1.IntegrationKit, k2 *v1.Integrati
                if !ok {
                        return false, nil
                }
-               match, err := action.hasMatchingTrait(&kt1, &kt2)
+               match, err := hasMatchingTrait(&kt1, &kt2)
                if !match || err != nil {
                        return false, err
                }
@@ -243,131 +161,3 @@ func (action *buildKitAction) kitMatches(k1 
*v1.IntegrationKit, k2 *v1.Integrati
        }
        return true, nil
 }
-
-// hasMatchingTraits compares the traits defined on the v1.Integration with 
those defined on the v1.IntegrationKit
-func (action *buildKitAction) hasMatchingTraits(integration *v1.Integration, 
kit *v1.IntegrationKit) (bool, error) {
-       catalog := trait.NewCatalog(action.client)
-
-       traitCount := 0
-       for name, itTrait := range integration.Spec.Traits {
-               t := catalog.GetTrait(name)
-               if t != nil && !t.InfluencesKit() {
-                       // We don't store the trait configuration if the trait 
cannot influence the kit behavior
-                       continue
-               }
-               traitCount++
-               kitTrait, ok := kit.Spec.Traits[name]
-               if !ok {
-                       // skip it because trait configured on integration is 
not defined on kit
-                       return false, nil
-               }
-               if ct, ok := t.(trait.ComparableTrait); ok {
-                       comparable, err := action.hasComparableTrait(ct, 
&itTrait, &kitTrait)
-                       if !comparable || err != nil {
-                               return false, err
-                       }
-               } else {
-                       match, err := action.hasMatchingTrait(&itTrait, 
&kitTrait)
-                       if !match || err != nil {
-                               return false, err
-                       }
-               }
-       }
-
-       // Check the number of influencing traits matches
-       if len(kit.Spec.Traits) != traitCount {
-               return false, nil
-       }
-
-       return true, nil
-}
-
-func (action *buildKitAction) hasComparableTrait(c trait.ComparableTrait, 
itTrait *v1.TraitSpec, kitTrait *v1.TraitSpec) (bool, error) {
-       it := reflect.New(reflect.TypeOf(c).Elem()).Interface()
-       data, err := json.Marshal(itTrait.Configuration)
-       if err != nil {
-               return false, err
-       }
-       err = json.Unmarshal(data, &it)
-       if err != nil {
-               return false, err
-       }
-
-       kt := reflect.New(reflect.TypeOf(c).Elem()).Interface()
-       data, err = json.Marshal(kitTrait.Configuration)
-       if err != nil {
-               return false, err
-       }
-       err = json.Unmarshal(data, &kt)
-       if err != nil {
-               return false, err
-       }
-
-       return kt.(trait.ComparableTrait).Matches(it.(trait.Trait)), nil
-}
-
-func (action *buildKitAction) hasMatchingTrait(itTrait *v1.TraitSpec, kitTrait 
*v1.TraitSpec) (bool, error) {
-       data, err := json.Marshal(itTrait.Configuration)
-       if err != nil {
-               return false, err
-       }
-       itConf := make(map[string]interface{})
-       err = json.Unmarshal(data, &itConf)
-       if err != nil {
-               return false, err
-       }
-       data, err = json.Marshal(kitTrait.Configuration)
-       if err != nil {
-               return false, err
-       }
-       kitConf := make(map[string]interface{})
-       err = json.Unmarshal(data, &kitConf)
-       if err != nil {
-               return false, err
-       }
-       for ck, cv := range kitConf {
-               iv, ok := itConf[ck]
-               if !ok {
-                       // skip it because trait configured on kit has a value 
that is not defined
-                       // in integration trait
-                       return false, nil
-               }
-               if !equal(iv, cv) {
-                       // skip it because trait configured on kit has a value 
that differs from
-                       // the one configured on integration
-                       return false, nil
-               }
-       }
-       return true, nil
-}
-
-// We need to try to perform a slice equality in order to prevent a runtime 
panic
-func equal(a, b interface{}) bool {
-       aSlice, aOk := a.([]interface{})
-       bSlice, bOk := b.([]interface{})
-
-       if aOk && bOk {
-               // Both are slices
-               return sliceEqual(aSlice, bSlice)
-       }
-
-       if aOk || bOk {
-               // One of the 2 is a slice
-               return false
-       }
-
-       // None is a slice
-       return a == b
-}
-
-func sliceEqual(a, b []interface{}) bool {
-       if len(a) != len(b) {
-               return false
-       }
-       for i, v := range a {
-               if v != b[i] {
-                       return false
-               }
-       }
-       return true
-}
diff --git a/pkg/controller/integration/build_kit_test.go 
b/pkg/controller/integration/build_kit_test.go
index a1538e3..430dc38 100644
--- a/pkg/controller/integration/build_kit_test.go
+++ b/pkg/controller/integration/build_kit_test.go
@@ -84,7 +84,7 @@ func TestLookupKitForIntegration_DiscardKitsInError(t 
*testing.T) {
        a.InjectLogger(log.Log)
        a.InjectClient(c)
 
-       kits, err := a.lookupKitsForIntegration(context.TODO(), c, 
&v1.Integration{
+       kits, err := lookupKitsForIntegration(context.TODO(), c, 
&v1.Integration{
                TypeMeta: metav1.TypeMeta{
                        APIVersion: v1.SchemeGroupVersion.String(),
                        Kind:       v1.IntegrationKind,
@@ -207,7 +207,7 @@ func 
TestLookupKitForIntegration_DiscardKitsWithIncompatibleTraits(t *testing.T)
        a.InjectLogger(log.Log)
        a.InjectClient(c)
 
-       kits, err := a.lookupKitsForIntegration(context.TODO(), c, 
&v1.Integration{
+       kits, err := lookupKitsForIntegration(context.TODO(), c, 
&v1.Integration{
                TypeMeta: metav1.TypeMeta{
                        APIVersion: v1.SchemeGroupVersion.String(),
                        Kind:       v1.IntegrationKind,
@@ -276,7 +276,7 @@ func TestHasMatchingTraits_KitNoTraitShouldNotBePicked(t 
*testing.T) {
        a := buildKitAction{}
        a.InjectLogger(log.Log)
 
-       ok, err := a.hasMatchingTraits(integration, integrationKitSpec)
+       ok, err := hasMatchingTraits(integration, integrationKitSpec)
        assert.Nil(t, err)
        assert.False(t, ok)
 }
@@ -327,7 +327,7 @@ func TestHasMatchingTraits_KitSameTraitShouldBePicked(t 
*testing.T) {
        a := buildKitAction{}
        a.InjectLogger(log.Log)
 
-       ok, err := a.hasMatchingTraits(integration, integrationKitSpec)
+       ok, err := hasMatchingTraits(integration, integrationKitSpec)
        assert.Nil(t, err)
        assert.True(t, ok)
 }
diff --git a/pkg/controller/integration/integration_controller.go 
b/pkg/controller/integration/integration_controller.go
index 0a4df39..55bf2ec 100644
--- a/pkg/controller/integration/integration_controller.go
+++ b/pkg/controller/integration/integration_controller.go
@@ -49,8 +49,6 @@ import (
        "github.com/apache/camel-k/pkg/util/monitoring"
 )
 
-// Add creates a new Integration Controller and adds it to the Manager. The 
Manager will set fields on the Controller
-// and Start it when the Manager is Started.
 func Add(mgr manager.Manager) error {
        c, err := client.FromManager(mgr)
        if err != nil {
@@ -59,7 +57,6 @@ func Add(mgr manager.Manager) error {
        return add(mgr, newReconciler(mgr, c), c)
 }
 
-// newReconciler returns a new reconcile.Reconciler
 func newReconciler(mgr manager.Manager, c client.Client) reconcile.Reconciler {
        return monitoring.NewInstrumentedReconciler(
                &reconcileIntegration{
@@ -75,7 +72,6 @@ func newReconciler(mgr manager.Manager, c client.Client) 
reconcile.Reconciler {
        )
 }
 
-// add adds a new Controller to mgr with r as the reconcile.Reconciler
 func add(mgr manager.Manager, r reconcile.Reconciler, cl client.Client) error {
        // Create a new controller
        c, err := controller.New("integration-controller", mgr, 
controller.Options{Reconciler: r})
@@ -92,7 +88,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler, cl 
client.Client) error {
                                newIntegration := e.ObjectNew.(*v1.Integration)
                                // Ignore updates to the integration status in 
which case metadata.Generation does not change,
                                // or except when the integration phase changes 
as it's used to transition from one phase
-                               // to another
+                               // to another.
                                return oldIntegration.Generation != 
newIntegration.Generation ||
                                        oldIntegration.Status.Phase != 
newIntegration.Status.Phase
                        },
@@ -106,40 +102,46 @@ func add(mgr manager.Manager, r reconcile.Reconciler, cl 
client.Client) error {
                return err
        }
 
-       // Watch for IntegrationKit phase transitioning to ready or error and
-       // enqueue requests for any integrations that are in phase waiting for
-       // kit
+       // Watch for IntegrationKit phase transitioning to ready or error, and
+       // enqueue requests for any integration that matches the kit, in 
building
+       // or running phase.
        err = c.Watch(&source.Kind{Type: &v1.IntegrationKit{}},
                handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) 
[]reconcile.Request {
                        kit := a.(*v1.IntegrationKit)
                        var requests []reconcile.Request
 
-                       if kit.Status.Phase == v1.IntegrationKitPhaseReady || 
kit.Status.Phase == v1.IntegrationKitPhaseError {
-                               list := &v1.IntegrationList{}
+                       if kit.Status.Phase != v1.IntegrationKitPhaseReady && 
kit.Status.Phase != v1.IntegrationKitPhaseError {
+                               return requests
+                       }
 
-                               // Do global search in case of global operator 
(it may be using a global platform)
-                               var opts []ctrl.ListOption
-                               if !platform.IsCurrentOperatorGlobal() {
-                                       opts = append(opts, 
ctrl.InNamespace(kit.Namespace))
-                               }
+                       list := &v1.IntegrationList{}
+                       // Do global search in case of global operator (it may 
be using a global platform)
+                       var opts []ctrl.ListOption
+                       if !platform.IsCurrentOperatorGlobal() {
+                               opts = append(opts, 
ctrl.InNamespace(kit.Namespace))
+                       }
+                       if err := mgr.GetClient().List(context.Background(), 
list, opts...); err != nil {
+                               log.Error(err, "Failed to retrieve integration 
list")
+                               return requests
+                       }
 
-                               if err := mgr.GetClient().List(context.TODO(), 
list, opts...); err != nil {
-                                       log.Error(err, "Failed to retrieve 
integration list")
-                                       return requests
+                       for _, integration := range list.Items {
+                               if match, err := 
integrationMatches(&integration, kit); err != nil {
+                                       log.Errorf(err, "Error matching 
integration %q with kit %q", integration.Name, kit.Name)
+                                       continue
+                               } else if !match {
+                                       continue
                                }
-
-                               for _, integration := range list.Items {
-                                       if integration.Status.Phase == 
v1.IntegrationPhaseBuildingKit {
-                                               log.Infof("Kit %s ready, 
wake-up integration: %s", kit.Name, integration.Name)
-                                               requests = append(requests, 
reconcile.Request{
-                                                       NamespacedName: 
types.NamespacedName{
-                                                               Namespace: 
integration.Namespace,
-                                                               Name:      
integration.Name,
-                                                       },
-                                               })
-                                       }
+                               if integration.Status.Phase == 
v1.IntegrationPhaseBuildingKit ||
+                                       integration.Status.Phase == 
v1.IntegrationPhaseRunning {
+                                       log.Infof("Kit %s ready, notify 
integration: %s", kit.Name, integration.Name)
+                                       requests = append(requests, 
reconcile.Request{
+                                               NamespacedName: 
types.NamespacedName{
+                                                       Namespace: 
integration.Namespace,
+                                                       Name:      
integration.Name,
+                                               },
+                                       })
                                }
-
                        }
 
                        return requests
@@ -228,7 +230,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler, cl 
client.Client) error {
                return err
        }
 
-       // Watch cronjob to update the ready condition
+       // Watch for CronJob to update the ready condition
        err = c.Watch(&source.Kind{Type: &v1beta1.CronJob{}}, 
&handler.EnqueueRequestForOwner{
                OwnerType:    &v1.Integration{},
                IsController: false,
@@ -261,7 +263,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler, cl 
client.Client) error {
 
 var _ reconcile.Reconciler = &reconcileIntegration{}
 
-// reconcileIntegration reconciles a Integration object
+// reconcileIntegration reconciles an Integration object
 type reconcileIntegration struct {
        // This client, initialized using mgr.Client() above, is a split client
        // that reads objects from the cache and writes to the API server
diff --git a/pkg/controller/integration/build_kit.go 
b/pkg/controller/integration/kits.go
similarity index 53%
copy from pkg/controller/integration/build_kit.go
copy to pkg/controller/integration/kits.go
index ff396dd..2a7eab0 100644
--- a/pkg/controller/integration/build_kit.go
+++ b/pkg/controller/integration/kits.go
@@ -22,8 +22,6 @@ import (
        "encoding/json"
        "reflect"
 
-       "github.com/pkg/errors"
-
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/labels"
        "k8s.io/apimachinery/pkg/selection"
@@ -34,111 +32,9 @@ import (
        "github.com/apache/camel-k/pkg/platform"
        "github.com/apache/camel-k/pkg/trait"
        "github.com/apache/camel-k/pkg/util"
-       "github.com/apache/camel-k/pkg/util/defaults"
-       "github.com/apache/camel-k/pkg/util/kubernetes"
 )
 
-func newBuildKitAction() Action {
-       return &buildKitAction{}
-}
-
-type buildKitAction struct {
-       baseAction
-}
-
-func (action *buildKitAction) Name() string {
-       return "build-kit"
-}
-
-func (action *buildKitAction) CanHandle(integration *v1.Integration) bool {
-       return integration.Status.Phase == v1.IntegrationPhaseBuildingKit
-}
-
-func (action *buildKitAction) Handle(ctx context.Context, integration 
*v1.Integration) (*v1.Integration, error) {
-       // TODO: we may need to add a timeout strategy, i.e give up after some 
time in case of an unrecoverable error.
-
-       if integration.Status.IntegrationKit != nil {
-               kit, err := kubernetes.GetIntegrationKit(ctx, action.client, 
integration.Status.IntegrationKit.Name, 
integration.Status.IntegrationKit.Namespace)
-               if err != nil {
-                       return nil, errors.Wrapf(err, "unable to find 
integration kit %s/%s, %s", integration.Status.IntegrationKit.Namespace, 
integration.Status.IntegrationKit.Name, err)
-               }
-
-               if kit.Labels[v1.IntegrationKitTypeLabel] == 
v1.IntegrationKitTypePlatform {
-                       match, err := action.integrationMatches(integration, 
kit)
-                       if err != nil {
-                               return nil, err
-                       } else if !match {
-                               // We need to re-generate a kit, or search for 
a new one that
-                               // matches the integration, so let's remove the 
association
-                               // with the kit.
-                               
integration.SetIntegrationKit(&v1.IntegrationKit{})
-                               return integration, nil
-                       }
-               }
-
-               if kit.Status.Phase == v1.IntegrationKitPhaseError {
-                       integration.Status.Phase = v1.IntegrationPhaseError
-                       integration.SetIntegrationKit(kit)
-                       return integration, nil
-               }
-
-               if kit.Status.Phase == v1.IntegrationKitPhaseReady {
-                       integration.Status.Phase = v1.IntegrationPhaseDeploying
-                       integration.SetIntegrationKit(kit)
-                       return integration, nil
-               }
-
-               return nil, nil
-       }
-
-       existingKits, err := action.lookupKitsForIntegration(ctx, 
action.client, integration)
-       if err != nil {
-               return nil, err
-       }
-
-       env, err := trait.Apply(ctx, action.client, integration, nil)
-       if err != nil {
-               return nil, err
-       }
-
-       var integrationKit *v1.IntegrationKit
-kits:
-       for _, kit := range env.IntegrationKits {
-               kit := kit
-               for i, k := range existingKits {
-                       match, err := action.kitMatches(&kit, &k)
-                       if err != nil {
-                               return nil, err
-                       }
-                       if match {
-                               if integrationKit == nil ||
-                                       integrationKit.Status.Phase != 
v1.IntegrationKitPhaseReady && k.Status.Phase == v1.IntegrationKitPhaseReady ||
-                                       integrationKit.Status.Phase == 
v1.IntegrationKitPhaseReady && k.Status.Phase == v1.IntegrationKitPhaseReady && 
k.HasHigherPriorityThan(integrationKit) {
-                                       integrationKit = &existingKits[i]
-                               }
-                               continue kits
-                       }
-               }
-               if err := action.client.Create(ctx, &kit); err != nil {
-                       return nil, err
-               }
-               if integrationKit == nil {
-                       integrationKit = &kit
-               }
-       }
-
-       // Set the kit name so the next handle loop, will fall through the
-       // same path as integration with a user defined kit
-       integration.SetIntegrationKit(integrationKit)
-
-       if integrationKit.Status.Phase == v1.IntegrationKitPhaseReady {
-               integration.Status.Phase = v1.IntegrationPhaseDeploying
-       }
-
-       return integration, nil
-}
-
-func (action *buildKitAction) lookupKitsForIntegration(ctx context.Context, c 
ctrl.Reader, integration *v1.Integration) ([]v1.IntegrationKit, error) {
+func lookupKitsForIntegration(ctx context.Context, c ctrl.Reader, integration 
*v1.Integration, options ...ctrl.ListOption) ([]v1.IntegrationKit, error) {
        pl, err := platform.GetCurrent(ctx, c, integration.Namespace)
        if err != nil && !k8serrors.IsNotFound(err) {
                return nil, err
@@ -152,7 +48,7 @@ func (action *buildKitAction) lookupKitsForIntegration(ctx 
context.Context, c ct
                return nil, err
        }
 
-       options := []ctrl.ListOption{
+       listOptions := []ctrl.ListOption{
                ctrl.InNamespace(integration.GetIntegrationKitNamespace(pl)),
                ctrl.MatchingLabels{
                        "camel.apache.org/runtime.version":  
integration.Status.RuntimeVersion,
@@ -162,15 +58,16 @@ func (action *buildKitAction) lookupKitsForIntegration(ctx 
context.Context, c ct
                        Selector: labels.NewSelector().Add(*kitTypes),
                },
        }
+       listOptions = append(listOptions, options...)
 
        list := v1.NewIntegrationKitList()
-       if err := c.List(ctx, &list, options...); err != nil {
+       if err := c.List(ctx, &list, listOptions...); err != nil {
                return nil, err
        }
 
        kits := make([]v1.IntegrationKit, 0)
        for _, kit := range list.Items {
-               match, err := action.integrationMatches(integration, &kit)
+               match, err := integrationMatches(integration, &kit)
                if err != nil {
                        return nil, err
                } else if !match {
@@ -183,13 +80,19 @@ func (action *buildKitAction) lookupKitsForIntegration(ctx 
context.Context, c ct
 }
 
 // integrationMatches returns whether the v1.IntegrationKit meets the 
requirements of the v1.Integration
-func (action *buildKitAction) integrationMatches(integration *v1.Integration, 
kit *v1.IntegrationKit) (bool, error) {
+func integrationMatches(integration *v1.Integration, kit *v1.IntegrationKit) 
(bool, error) {
        if kit.Status.Phase == v1.IntegrationKitPhaseError {
                return false, nil
        }
        if kit.Status.Version != integration.Status.Version {
                return false, nil
        }
+       if kit.Status.RuntimeProvider != integration.Status.RuntimeProvider {
+               return false, nil
+       }
+       if kit.Status.RuntimeVersion != integration.Status.RuntimeVersion {
+               return false, nil
+       }
        if len(integration.Status.Dependencies) != len(kit.Spec.Dependencies) {
                return false, nil
        }
@@ -203,7 +106,7 @@ func (action *buildKitAction) 
integrationMatches(integration *v1.Integration, ki
        //
        // A kit can be used only if it contains a subset of the traits and 
related configurations
        // declared on integration.
-       if match, err := action.hasMatchingTraits(integration, kit); !match || 
err != nil {
+       if match, err := hasMatchingTraits(integration, kit); !match || err != 
nil {
                return false, err
        }
        if !util.StringSliceContains(kit.Spec.Dependencies, 
integration.Status.Dependencies) {
@@ -212,41 +115,9 @@ func (action *buildKitAction) 
integrationMatches(integration *v1.Integration, ki
        return true, nil
 }
 
-// kitMatches returns whether the v1.IntegrationKit match
-func (action *buildKitAction) kitMatches(k1 *v1.IntegrationKit, k2 
*v1.IntegrationKit) (bool, error) {
-       version := k1.Status.Version
-       if version == "" {
-               // Defaults with the version that is going to be set during the 
kit initialization
-               version = defaults.Version
-       }
-       if version != k2.Status.Version {
-               return false, nil
-       }
-       if len(k1.Spec.Dependencies) != len(k2.Spec.Dependencies) {
-               return false, nil
-       }
-       if len(k1.Spec.Traits) != len(k2.Spec.Traits) {
-               return false, nil
-       }
-       for name, kt1 := range k1.Spec.Traits {
-               kt2, ok := k2.Spec.Traits[name]
-               if !ok {
-                       return false, nil
-               }
-               match, err := action.hasMatchingTrait(&kt1, &kt2)
-               if !match || err != nil {
-                       return false, err
-               }
-       }
-       if !util.StringSliceContains(k1.Spec.Dependencies, 
k2.Spec.Dependencies) {
-               return false, nil
-       }
-       return true, nil
-}
-
 // hasMatchingTraits compares the traits defined on the v1.Integration with 
those defined on the v1.IntegrationKit
-func (action *buildKitAction) hasMatchingTraits(integration *v1.Integration, 
kit *v1.IntegrationKit) (bool, error) {
-       catalog := trait.NewCatalog(action.client)
+func hasMatchingTraits(integration *v1.Integration, kit *v1.IntegrationKit) 
(bool, error) {
+       catalog := trait.NewCatalog(nil)
 
        traitCount := 0
        for name, itTrait := range integration.Spec.Traits {
@@ -262,12 +133,12 @@ func (action *buildKitAction) 
hasMatchingTraits(integration *v1.Integration, kit
                        return false, nil
                }
                if ct, ok := t.(trait.ComparableTrait); ok {
-                       comparable, err := action.hasComparableTrait(ct, 
&itTrait, &kitTrait)
+                       comparable, err := hasComparableTrait(ct, &itTrait, 
&kitTrait)
                        if !comparable || err != nil {
                                return false, err
                        }
                } else {
-                       match, err := action.hasMatchingTrait(&itTrait, 
&kitTrait)
+                       match, err := hasMatchingTrait(&itTrait, &kitTrait)
                        if !match || err != nil {
                                return false, err
                        }
@@ -282,7 +153,7 @@ func (action *buildKitAction) hasMatchingTraits(integration 
*v1.Integration, kit
        return true, nil
 }
 
-func (action *buildKitAction) hasComparableTrait(c trait.ComparableTrait, 
itTrait *v1.TraitSpec, kitTrait *v1.TraitSpec) (bool, error) {
+func hasComparableTrait(c trait.ComparableTrait, itTrait *v1.TraitSpec, 
kitTrait *v1.TraitSpec) (bool, error) {
        it := reflect.New(reflect.TypeOf(c).Elem()).Interface()
        data, err := json.Marshal(itTrait.Configuration)
        if err != nil {
@@ -306,7 +177,7 @@ func (action *buildKitAction) hasComparableTrait(c 
trait.ComparableTrait, itTrai
        return kt.(trait.ComparableTrait).Matches(it.(trait.Trait)), nil
 }
 
-func (action *buildKitAction) hasMatchingTrait(itTrait *v1.TraitSpec, kitTrait 
*v1.TraitSpec) (bool, error) {
+func hasMatchingTrait(itTrait *v1.TraitSpec, kitTrait *v1.TraitSpec) (bool, 
error) {
        data, err := json.Marshal(itTrait.Configuration)
        if err != nil {
                return false, err
diff --git a/pkg/controller/integration/monitor.go 
b/pkg/controller/integration/monitor.go
index 858b22b..532ad8f 100644
--- a/pkg/controller/integration/monitor.go
+++ b/pkg/controller/integration/monitor.go
@@ -19,13 +19,16 @@ package integration
 
 import (
        "context"
+       "strconv"
 
        "github.com/pkg/errors"
 
        appsv1 "k8s.io/api/apps/v1"
        corev1 "k8s.io/api/core/v1"
+       "k8s.io/apimachinery/pkg/labels"
+       "k8s.io/apimachinery/pkg/selection"
 
-       k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
+       ctrl "sigs.k8s.io/controller-runtime/pkg/client"
 
        v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
        "github.com/apache/camel-k/pkg/trait"
@@ -51,6 +54,7 @@ func (action *monitorAction) CanHandle(integration 
*v1.Integration) bool {
 }
 
 func (action *monitorAction) Handle(ctx context.Context, integration 
*v1.Integration) (*v1.Integration, error) {
+       // Check if the Integration has changed
        hash, err := digest.ComputeForIntegration(integration)
        if err != nil {
                return nil, err
@@ -70,6 +74,29 @@ func (action *monitorAction) Handle(ctx context.Context, 
integration *v1.Integra
                return nil, errors.Wrapf(err, "unable to find integration kit 
%s/%s, %s", integration.Status.IntegrationKit.Namespace, 
integration.Status.IntegrationKit.Name, err)
        }
 
+       // Check if an IntegrationKit with higher priority is ready
+       priority, ok := kit.Labels[v1.IntegrationKitPriorityLabel]
+       if !ok {
+               priority = "0"
+       }
+       withHigherPriority, err := 
labels.NewRequirement(v1.IntegrationKitPriorityLabel, selection.GreaterThan, 
[]string{priority})
+       if err != nil {
+               return nil, err
+       }
+       kits, err := lookupKitsForIntegration(ctx, action.client, integration, 
ctrl.MatchingLabelsSelector{
+               Selector: labels.NewSelector().Add(*withHigherPriority),
+       })
+       if err != nil {
+               return nil, err
+       }
+       priorityReadyKit, err := findHighestPriorityReadyKit(kits)
+       if err != nil {
+               return nil, err
+       }
+       if priorityReadyKit != nil {
+               integration.SetIntegrationKit(priorityReadyKit)
+       }
+
        // Run traits that are enabled for the running phase
        _, err = trait.Apply(ctx, action.client, integration, kit)
        if err != nil {
@@ -84,8 +111,8 @@ func (action *monitorAction) Handle(ctx context.Context, 
integration *v1.Integra
        // Check replicas
        replicaSets := &appsv1.ReplicaSetList{}
        err = action.client.List(ctx, replicaSets,
-               k8sclient.InNamespace(integration.Namespace),
-               k8sclient.MatchingLabels{
+               ctrl.InNamespace(integration.Namespace),
+               ctrl.MatchingLabels{
                        v1.IntegrationLabel: integration.Name,
                })
        if err != nil {
@@ -160,3 +187,25 @@ func findLatestReplicaSet(list *appsv1.ReplicaSetList) 
*appsv1.ReplicaSet {
        }
        return &latest
 }
+
+func findHighestPriorityReadyKit(kits []v1.IntegrationKit) 
(*v1.IntegrationKit, error) {
+       if len(kits) == 0 {
+               return nil, nil
+       }
+       var kit *v1.IntegrationKit
+       priority := 0
+       for i, k := range kits {
+               if k.Status.Phase != v1.IntegrationKitPhaseReady {
+                       continue
+               }
+               p, err := strconv.Atoi(k.Labels[v1.IntegrationKitPriorityLabel])
+               if err != nil {
+                       return nil, err
+               }
+               if p > priority {
+                       kit = &kits[i]
+                       priority = p
+               }
+       }
+       return kit, nil
+}

Reply via email to