This is an automated email from the ASF dual-hosted git repository. astefanutti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 48525daa7028d7fc298463a3649539faae1cfc26 Author: Antonin Stefanutti <[email protected]> AuthorDate: Mon Aug 30 17:21:13 2021 +0200 feat(native): Automatic rollout to highest priority kit --- pkg/controller/integration/build_kit.go | 216 +-------------------- pkg/controller/integration/build_kit_test.go | 8 +- .../integration/integration_controller.go | 66 ++++--- .../integration/{build_kit.go => kits.go} | 167 ++-------------- pkg/controller/integration/monitor.go | 55 +++++- 5 files changed, 112 insertions(+), 400 deletions(-) diff --git a/pkg/controller/integration/build_kit.go b/pkg/controller/integration/build_kit.go index ff396dd..a0fee46 100644 --- a/pkg/controller/integration/build_kit.go +++ b/pkg/controller/integration/build_kit.go @@ -19,19 +19,10 @@ package integration import ( "context" - "encoding/json" - "reflect" "github.com/pkg/errors" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/selection" - - ctrl "sigs.k8s.io/controller-runtime/pkg/client" - v1 "github.com/apache/camel-k/pkg/apis/camel/v1" - "github.com/apache/camel-k/pkg/platform" "github.com/apache/camel-k/pkg/trait" "github.com/apache/camel-k/pkg/util" "github.com/apache/camel-k/pkg/util/defaults" @@ -64,7 +55,7 @@ func (action *buildKitAction) Handle(ctx context.Context, integration *v1.Integr } if kit.Labels[v1.IntegrationKitTypeLabel] == v1.IntegrationKitTypePlatform { - match, err := action.integrationMatches(integration, kit) + match, err := integrationMatches(integration, kit) if err != nil { return nil, err } else if !match { @@ -91,7 +82,7 @@ func (action *buildKitAction) Handle(ctx context.Context, integration *v1.Integr return nil, nil } - existingKits, err := action.lookupKitsForIntegration(ctx, action.client, integration) + existingKits, err := lookupKitsForIntegration(ctx, action.client, integration) if err != nil { return nil, err } @@ -138,79 +129,6 @@ kits: return integration, nil } -func (action *buildKitAction) lookupKitsForIntegration(ctx context.Context, c ctrl.Reader, integration *v1.Integration) ([]v1.IntegrationKit, error) { - pl, err := platform.GetCurrent(ctx, c, integration.Namespace) - if err != nil && !k8serrors.IsNotFound(err) { - return nil, err - } - - kitTypes, err := labels.NewRequirement(v1.IntegrationKitTypeLabel, selection.In, []string{ - v1.IntegrationKitTypePlatform, - v1.IntegrationKitTypeExternal, - }) - if err != nil { - return nil, err - } - - options := []ctrl.ListOption{ - ctrl.InNamespace(integration.GetIntegrationKitNamespace(pl)), - ctrl.MatchingLabels{ - "camel.apache.org/runtime.version": integration.Status.RuntimeVersion, - "camel.apache.org/runtime.provider": string(integration.Status.RuntimeProvider), - }, - ctrl.MatchingLabelsSelector{ - Selector: labels.NewSelector().Add(*kitTypes), - }, - } - - list := v1.NewIntegrationKitList() - if err := c.List(ctx, &list, options...); err != nil { - return nil, err - } - - kits := make([]v1.IntegrationKit, 0) - for _, kit := range list.Items { - match, err := action.integrationMatches(integration, &kit) - if err != nil { - return nil, err - } else if !match { - continue - } - kits = append(kits, kit) - } - - return kits, nil -} - -// integrationMatches returns whether the v1.IntegrationKit meets the requirements of the v1.Integration -func (action *buildKitAction) integrationMatches(integration *v1.Integration, kit *v1.IntegrationKit) (bool, error) { - if kit.Status.Phase == v1.IntegrationKitPhaseError { - return false, nil - } - if kit.Status.Version != integration.Status.Version { - return false, nil - } - if len(integration.Status.Dependencies) != len(kit.Spec.Dependencies) { - return false, nil - } - // When a platform kit is created it inherits the traits from the integrations and as - // some traits may influence the build thus the artifacts present on the container image, - // we need to take traits into account when looking up for compatible kits. - // - // It could also happen that an integration is updated and a trait is modified, if we do - // not include traits in the lookup, we may use a kit that does not have all the - // characteristics required by the integration. - // - // A kit can be used only if it contains a subset of the traits and related configurations - // declared on integration. - if match, err := action.hasMatchingTraits(integration, kit); !match || err != nil { - return false, err - } - if !util.StringSliceContains(kit.Spec.Dependencies, integration.Status.Dependencies) { - return false, nil - } - return true, nil -} // kitMatches returns whether the v1.IntegrationKit match func (action *buildKitAction) kitMatches(k1 *v1.IntegrationKit, k2 *v1.IntegrationKit) (bool, error) { @@ -233,7 +151,7 @@ func (action *buildKitAction) kitMatches(k1 *v1.IntegrationKit, k2 *v1.Integrati if !ok { return false, nil } - match, err := action.hasMatchingTrait(&kt1, &kt2) + match, err := hasMatchingTrait(&kt1, &kt2) if !match || err != nil { return false, err } @@ -243,131 +161,3 @@ func (action *buildKitAction) kitMatches(k1 *v1.IntegrationKit, k2 *v1.Integrati } return true, nil } - -// hasMatchingTraits compares the traits defined on the v1.Integration with those defined on the v1.IntegrationKit -func (action *buildKitAction) hasMatchingTraits(integration *v1.Integration, kit *v1.IntegrationKit) (bool, error) { - catalog := trait.NewCatalog(action.client) - - traitCount := 0 - for name, itTrait := range integration.Spec.Traits { - t := catalog.GetTrait(name) - if t != nil && !t.InfluencesKit() { - // We don't store the trait configuration if the trait cannot influence the kit behavior - continue - } - traitCount++ - kitTrait, ok := kit.Spec.Traits[name] - if !ok { - // skip it because trait configured on integration is not defined on kit - return false, nil - } - if ct, ok := t.(trait.ComparableTrait); ok { - comparable, err := action.hasComparableTrait(ct, &itTrait, &kitTrait) - if !comparable || err != nil { - return false, err - } - } else { - match, err := action.hasMatchingTrait(&itTrait, &kitTrait) - if !match || err != nil { - return false, err - } - } - } - - // Check the number of influencing traits matches - if len(kit.Spec.Traits) != traitCount { - return false, nil - } - - return true, nil -} - -func (action *buildKitAction) hasComparableTrait(c trait.ComparableTrait, itTrait *v1.TraitSpec, kitTrait *v1.TraitSpec) (bool, error) { - it := reflect.New(reflect.TypeOf(c).Elem()).Interface() - data, err := json.Marshal(itTrait.Configuration) - if err != nil { - return false, err - } - err = json.Unmarshal(data, &it) - if err != nil { - return false, err - } - - kt := reflect.New(reflect.TypeOf(c).Elem()).Interface() - data, err = json.Marshal(kitTrait.Configuration) - if err != nil { - return false, err - } - err = json.Unmarshal(data, &kt) - if err != nil { - return false, err - } - - return kt.(trait.ComparableTrait).Matches(it.(trait.Trait)), nil -} - -func (action *buildKitAction) hasMatchingTrait(itTrait *v1.TraitSpec, kitTrait *v1.TraitSpec) (bool, error) { - data, err := json.Marshal(itTrait.Configuration) - if err != nil { - return false, err - } - itConf := make(map[string]interface{}) - err = json.Unmarshal(data, &itConf) - if err != nil { - return false, err - } - data, err = json.Marshal(kitTrait.Configuration) - if err != nil { - return false, err - } - kitConf := make(map[string]interface{}) - err = json.Unmarshal(data, &kitConf) - if err != nil { - return false, err - } - for ck, cv := range kitConf { - iv, ok := itConf[ck] - if !ok { - // skip it because trait configured on kit has a value that is not defined - // in integration trait - return false, nil - } - if !equal(iv, cv) { - // skip it because trait configured on kit has a value that differs from - // the one configured on integration - return false, nil - } - } - return true, nil -} - -// We need to try to perform a slice equality in order to prevent a runtime panic -func equal(a, b interface{}) bool { - aSlice, aOk := a.([]interface{}) - bSlice, bOk := b.([]interface{}) - - if aOk && bOk { - // Both are slices - return sliceEqual(aSlice, bSlice) - } - - if aOk || bOk { - // One of the 2 is a slice - return false - } - - // None is a slice - return a == b -} - -func sliceEqual(a, b []interface{}) bool { - if len(a) != len(b) { - return false - } - for i, v := range a { - if v != b[i] { - return false - } - } - return true -} diff --git a/pkg/controller/integration/build_kit_test.go b/pkg/controller/integration/build_kit_test.go index a1538e3..430dc38 100644 --- a/pkg/controller/integration/build_kit_test.go +++ b/pkg/controller/integration/build_kit_test.go @@ -84,7 +84,7 @@ func TestLookupKitForIntegration_DiscardKitsInError(t *testing.T) { a.InjectLogger(log.Log) a.InjectClient(c) - kits, err := a.lookupKitsForIntegration(context.TODO(), c, &v1.Integration{ + kits, err := lookupKitsForIntegration(context.TODO(), c, &v1.Integration{ TypeMeta: metav1.TypeMeta{ APIVersion: v1.SchemeGroupVersion.String(), Kind: v1.IntegrationKind, @@ -207,7 +207,7 @@ func TestLookupKitForIntegration_DiscardKitsWithIncompatibleTraits(t *testing.T) a.InjectLogger(log.Log) a.InjectClient(c) - kits, err := a.lookupKitsForIntegration(context.TODO(), c, &v1.Integration{ + kits, err := lookupKitsForIntegration(context.TODO(), c, &v1.Integration{ TypeMeta: metav1.TypeMeta{ APIVersion: v1.SchemeGroupVersion.String(), Kind: v1.IntegrationKind, @@ -276,7 +276,7 @@ func TestHasMatchingTraits_KitNoTraitShouldNotBePicked(t *testing.T) { a := buildKitAction{} a.InjectLogger(log.Log) - ok, err := a.hasMatchingTraits(integration, integrationKitSpec) + ok, err := hasMatchingTraits(integration, integrationKitSpec) assert.Nil(t, err) assert.False(t, ok) } @@ -327,7 +327,7 @@ func TestHasMatchingTraits_KitSameTraitShouldBePicked(t *testing.T) { a := buildKitAction{} a.InjectLogger(log.Log) - ok, err := a.hasMatchingTraits(integration, integrationKitSpec) + ok, err := hasMatchingTraits(integration, integrationKitSpec) assert.Nil(t, err) assert.True(t, ok) } diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go index 0a4df39..55bf2ec 100644 --- a/pkg/controller/integration/integration_controller.go +++ b/pkg/controller/integration/integration_controller.go @@ -49,8 +49,6 @@ import ( "github.com/apache/camel-k/pkg/util/monitoring" ) -// Add creates a new Integration Controller and adds it to the Manager. The Manager will set fields on the Controller -// and Start it when the Manager is Started. func Add(mgr manager.Manager) error { c, err := client.FromManager(mgr) if err != nil { @@ -59,7 +57,6 @@ func Add(mgr manager.Manager) error { return add(mgr, newReconciler(mgr, c), c) } -// newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager, c client.Client) reconcile.Reconciler { return monitoring.NewInstrumentedReconciler( &reconcileIntegration{ @@ -75,7 +72,6 @@ func newReconciler(mgr manager.Manager, c client.Client) reconcile.Reconciler { ) } -// add adds a new Controller to mgr with r as the reconcile.Reconciler func add(mgr manager.Manager, r reconcile.Reconciler, cl client.Client) error { // Create a new controller c, err := controller.New("integration-controller", mgr, controller.Options{Reconciler: r}) @@ -92,7 +88,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler, cl client.Client) error { newIntegration := e.ObjectNew.(*v1.Integration) // Ignore updates to the integration status in which case metadata.Generation does not change, // or except when the integration phase changes as it's used to transition from one phase - // to another + // to another. return oldIntegration.Generation != newIntegration.Generation || oldIntegration.Status.Phase != newIntegration.Status.Phase }, @@ -106,40 +102,46 @@ func add(mgr manager.Manager, r reconcile.Reconciler, cl client.Client) error { return err } - // Watch for IntegrationKit phase transitioning to ready or error and - // enqueue requests for any integrations that are in phase waiting for - // kit + // Watch for IntegrationKit phase transitioning to ready or error, and + // enqueue requests for any integration that matches the kit, in building + // or running phase. err = c.Watch(&source.Kind{Type: &v1.IntegrationKit{}}, handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) []reconcile.Request { kit := a.(*v1.IntegrationKit) var requests []reconcile.Request - if kit.Status.Phase == v1.IntegrationKitPhaseReady || kit.Status.Phase == v1.IntegrationKitPhaseError { - list := &v1.IntegrationList{} + if kit.Status.Phase != v1.IntegrationKitPhaseReady && kit.Status.Phase != v1.IntegrationKitPhaseError { + return requests + } - // Do global search in case of global operator (it may be using a global platform) - var opts []ctrl.ListOption - if !platform.IsCurrentOperatorGlobal() { - opts = append(opts, ctrl.InNamespace(kit.Namespace)) - } + list := &v1.IntegrationList{} + // Do global search in case of global operator (it may be using a global platform) + var opts []ctrl.ListOption + if !platform.IsCurrentOperatorGlobal() { + opts = append(opts, ctrl.InNamespace(kit.Namespace)) + } + if err := mgr.GetClient().List(context.Background(), list, opts...); err != nil { + log.Error(err, "Failed to retrieve integration list") + return requests + } - if err := mgr.GetClient().List(context.TODO(), list, opts...); err != nil { - log.Error(err, "Failed to retrieve integration list") - return requests + for _, integration := range list.Items { + if match, err := integrationMatches(&integration, kit); err != nil { + log.Errorf(err, "Error matching integration %q with kit %q", integration.Name, kit.Name) + continue + } else if !match { + continue } - - for _, integration := range list.Items { - if integration.Status.Phase == v1.IntegrationPhaseBuildingKit { - log.Infof("Kit %s ready, wake-up integration: %s", kit.Name, integration.Name) - requests = append(requests, reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: integration.Namespace, - Name: integration.Name, - }, - }) - } + if integration.Status.Phase == v1.IntegrationPhaseBuildingKit || + integration.Status.Phase == v1.IntegrationPhaseRunning { + log.Infof("Kit %s ready, notify integration: %s", kit.Name, integration.Name) + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: integration.Namespace, + Name: integration.Name, + }, + }) } - } return requests @@ -228,7 +230,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler, cl client.Client) error { return err } - // Watch cronjob to update the ready condition + // Watch for CronJob to update the ready condition err = c.Watch(&source.Kind{Type: &v1beta1.CronJob{}}, &handler.EnqueueRequestForOwner{ OwnerType: &v1.Integration{}, IsController: false, @@ -261,7 +263,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler, cl client.Client) error { var _ reconcile.Reconciler = &reconcileIntegration{} -// reconcileIntegration reconciles a Integration object +// reconcileIntegration reconciles an Integration object type reconcileIntegration struct { // This client, initialized using mgr.Client() above, is a split client // that reads objects from the cache and writes to the API server diff --git a/pkg/controller/integration/build_kit.go b/pkg/controller/integration/kits.go similarity index 53% copy from pkg/controller/integration/build_kit.go copy to pkg/controller/integration/kits.go index ff396dd..2a7eab0 100644 --- a/pkg/controller/integration/build_kit.go +++ b/pkg/controller/integration/kits.go @@ -22,8 +22,6 @@ import ( "encoding/json" "reflect" - "github.com/pkg/errors" - k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" @@ -34,111 +32,9 @@ import ( "github.com/apache/camel-k/pkg/platform" "github.com/apache/camel-k/pkg/trait" "github.com/apache/camel-k/pkg/util" - "github.com/apache/camel-k/pkg/util/defaults" - "github.com/apache/camel-k/pkg/util/kubernetes" ) -func newBuildKitAction() Action { - return &buildKitAction{} -} - -type buildKitAction struct { - baseAction -} - -func (action *buildKitAction) Name() string { - return "build-kit" -} - -func (action *buildKitAction) CanHandle(integration *v1.Integration) bool { - return integration.Status.Phase == v1.IntegrationPhaseBuildingKit -} - -func (action *buildKitAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) { - // TODO: we may need to add a timeout strategy, i.e give up after some time in case of an unrecoverable error. - - if integration.Status.IntegrationKit != nil { - kit, err := kubernetes.GetIntegrationKit(ctx, action.client, integration.Status.IntegrationKit.Name, integration.Status.IntegrationKit.Namespace) - if err != nil { - return nil, errors.Wrapf(err, "unable to find integration kit %s/%s, %s", integration.Status.IntegrationKit.Namespace, integration.Status.IntegrationKit.Name, err) - } - - if kit.Labels[v1.IntegrationKitTypeLabel] == v1.IntegrationKitTypePlatform { - match, err := action.integrationMatches(integration, kit) - if err != nil { - return nil, err - } else if !match { - // We need to re-generate a kit, or search for a new one that - // matches the integration, so let's remove the association - // with the kit. - integration.SetIntegrationKit(&v1.IntegrationKit{}) - return integration, nil - } - } - - if kit.Status.Phase == v1.IntegrationKitPhaseError { - integration.Status.Phase = v1.IntegrationPhaseError - integration.SetIntegrationKit(kit) - return integration, nil - } - - if kit.Status.Phase == v1.IntegrationKitPhaseReady { - integration.Status.Phase = v1.IntegrationPhaseDeploying - integration.SetIntegrationKit(kit) - return integration, nil - } - - return nil, nil - } - - existingKits, err := action.lookupKitsForIntegration(ctx, action.client, integration) - if err != nil { - return nil, err - } - - env, err := trait.Apply(ctx, action.client, integration, nil) - if err != nil { - return nil, err - } - - var integrationKit *v1.IntegrationKit -kits: - for _, kit := range env.IntegrationKits { - kit := kit - for i, k := range existingKits { - match, err := action.kitMatches(&kit, &k) - if err != nil { - return nil, err - } - if match { - if integrationKit == nil || - integrationKit.Status.Phase != v1.IntegrationKitPhaseReady && k.Status.Phase == v1.IntegrationKitPhaseReady || - integrationKit.Status.Phase == v1.IntegrationKitPhaseReady && k.Status.Phase == v1.IntegrationKitPhaseReady && k.HasHigherPriorityThan(integrationKit) { - integrationKit = &existingKits[i] - } - continue kits - } - } - if err := action.client.Create(ctx, &kit); err != nil { - return nil, err - } - if integrationKit == nil { - integrationKit = &kit - } - } - - // Set the kit name so the next handle loop, will fall through the - // same path as integration with a user defined kit - integration.SetIntegrationKit(integrationKit) - - if integrationKit.Status.Phase == v1.IntegrationKitPhaseReady { - integration.Status.Phase = v1.IntegrationPhaseDeploying - } - - return integration, nil -} - -func (action *buildKitAction) lookupKitsForIntegration(ctx context.Context, c ctrl.Reader, integration *v1.Integration) ([]v1.IntegrationKit, error) { +func lookupKitsForIntegration(ctx context.Context, c ctrl.Reader, integration *v1.Integration, options ...ctrl.ListOption) ([]v1.IntegrationKit, error) { pl, err := platform.GetCurrent(ctx, c, integration.Namespace) if err != nil && !k8serrors.IsNotFound(err) { return nil, err @@ -152,7 +48,7 @@ func (action *buildKitAction) lookupKitsForIntegration(ctx context.Context, c ct return nil, err } - options := []ctrl.ListOption{ + listOptions := []ctrl.ListOption{ ctrl.InNamespace(integration.GetIntegrationKitNamespace(pl)), ctrl.MatchingLabels{ "camel.apache.org/runtime.version": integration.Status.RuntimeVersion, @@ -162,15 +58,16 @@ func (action *buildKitAction) lookupKitsForIntegration(ctx context.Context, c ct Selector: labels.NewSelector().Add(*kitTypes), }, } + listOptions = append(listOptions, options...) list := v1.NewIntegrationKitList() - if err := c.List(ctx, &list, options...); err != nil { + if err := c.List(ctx, &list, listOptions...); err != nil { return nil, err } kits := make([]v1.IntegrationKit, 0) for _, kit := range list.Items { - match, err := action.integrationMatches(integration, &kit) + match, err := integrationMatches(integration, &kit) if err != nil { return nil, err } else if !match { @@ -183,13 +80,19 @@ func (action *buildKitAction) lookupKitsForIntegration(ctx context.Context, c ct } // integrationMatches returns whether the v1.IntegrationKit meets the requirements of the v1.Integration -func (action *buildKitAction) integrationMatches(integration *v1.Integration, kit *v1.IntegrationKit) (bool, error) { +func integrationMatches(integration *v1.Integration, kit *v1.IntegrationKit) (bool, error) { if kit.Status.Phase == v1.IntegrationKitPhaseError { return false, nil } if kit.Status.Version != integration.Status.Version { return false, nil } + if kit.Status.RuntimeProvider != integration.Status.RuntimeProvider { + return false, nil + } + if kit.Status.RuntimeVersion != integration.Status.RuntimeVersion { + return false, nil + } if len(integration.Status.Dependencies) != len(kit.Spec.Dependencies) { return false, nil } @@ -203,7 +106,7 @@ func (action *buildKitAction) integrationMatches(integration *v1.Integration, ki // // A kit can be used only if it contains a subset of the traits and related configurations // declared on integration. - if match, err := action.hasMatchingTraits(integration, kit); !match || err != nil { + if match, err := hasMatchingTraits(integration, kit); !match || err != nil { return false, err } if !util.StringSliceContains(kit.Spec.Dependencies, integration.Status.Dependencies) { @@ -212,41 +115,9 @@ func (action *buildKitAction) integrationMatches(integration *v1.Integration, ki return true, nil } -// kitMatches returns whether the v1.IntegrationKit match -func (action *buildKitAction) kitMatches(k1 *v1.IntegrationKit, k2 *v1.IntegrationKit) (bool, error) { - version := k1.Status.Version - if version == "" { - // Defaults with the version that is going to be set during the kit initialization - version = defaults.Version - } - if version != k2.Status.Version { - return false, nil - } - if len(k1.Spec.Dependencies) != len(k2.Spec.Dependencies) { - return false, nil - } - if len(k1.Spec.Traits) != len(k2.Spec.Traits) { - return false, nil - } - for name, kt1 := range k1.Spec.Traits { - kt2, ok := k2.Spec.Traits[name] - if !ok { - return false, nil - } - match, err := action.hasMatchingTrait(&kt1, &kt2) - if !match || err != nil { - return false, err - } - } - if !util.StringSliceContains(k1.Spec.Dependencies, k2.Spec.Dependencies) { - return false, nil - } - return true, nil -} - // hasMatchingTraits compares the traits defined on the v1.Integration with those defined on the v1.IntegrationKit -func (action *buildKitAction) hasMatchingTraits(integration *v1.Integration, kit *v1.IntegrationKit) (bool, error) { - catalog := trait.NewCatalog(action.client) +func hasMatchingTraits(integration *v1.Integration, kit *v1.IntegrationKit) (bool, error) { + catalog := trait.NewCatalog(nil) traitCount := 0 for name, itTrait := range integration.Spec.Traits { @@ -262,12 +133,12 @@ func (action *buildKitAction) hasMatchingTraits(integration *v1.Integration, kit return false, nil } if ct, ok := t.(trait.ComparableTrait); ok { - comparable, err := action.hasComparableTrait(ct, &itTrait, &kitTrait) + comparable, err := hasComparableTrait(ct, &itTrait, &kitTrait) if !comparable || err != nil { return false, err } } else { - match, err := action.hasMatchingTrait(&itTrait, &kitTrait) + match, err := hasMatchingTrait(&itTrait, &kitTrait) if !match || err != nil { return false, err } @@ -282,7 +153,7 @@ func (action *buildKitAction) hasMatchingTraits(integration *v1.Integration, kit return true, nil } -func (action *buildKitAction) hasComparableTrait(c trait.ComparableTrait, itTrait *v1.TraitSpec, kitTrait *v1.TraitSpec) (bool, error) { +func hasComparableTrait(c trait.ComparableTrait, itTrait *v1.TraitSpec, kitTrait *v1.TraitSpec) (bool, error) { it := reflect.New(reflect.TypeOf(c).Elem()).Interface() data, err := json.Marshal(itTrait.Configuration) if err != nil { @@ -306,7 +177,7 @@ func (action *buildKitAction) hasComparableTrait(c trait.ComparableTrait, itTrai return kt.(trait.ComparableTrait).Matches(it.(trait.Trait)), nil } -func (action *buildKitAction) hasMatchingTrait(itTrait *v1.TraitSpec, kitTrait *v1.TraitSpec) (bool, error) { +func hasMatchingTrait(itTrait *v1.TraitSpec, kitTrait *v1.TraitSpec) (bool, error) { data, err := json.Marshal(itTrait.Configuration) if err != nil { return false, err diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go index 858b22b..532ad8f 100644 --- a/pkg/controller/integration/monitor.go +++ b/pkg/controller/integration/monitor.go @@ -19,13 +19,16 @@ package integration import ( "context" + "strconv" "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" - k8sclient "sigs.k8s.io/controller-runtime/pkg/client" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" v1 "github.com/apache/camel-k/pkg/apis/camel/v1" "github.com/apache/camel-k/pkg/trait" @@ -51,6 +54,7 @@ func (action *monitorAction) CanHandle(integration *v1.Integration) bool { } func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) { + // Check if the Integration has changed hash, err := digest.ComputeForIntegration(integration) if err != nil { return nil, err @@ -70,6 +74,29 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra return nil, errors.Wrapf(err, "unable to find integration kit %s/%s, %s", integration.Status.IntegrationKit.Namespace, integration.Status.IntegrationKit.Name, err) } + // Check if an IntegrationKit with higher priority is ready + priority, ok := kit.Labels[v1.IntegrationKitPriorityLabel] + if !ok { + priority = "0" + } + withHigherPriority, err := labels.NewRequirement(v1.IntegrationKitPriorityLabel, selection.GreaterThan, []string{priority}) + if err != nil { + return nil, err + } + kits, err := lookupKitsForIntegration(ctx, action.client, integration, ctrl.MatchingLabelsSelector{ + Selector: labels.NewSelector().Add(*withHigherPriority), + }) + if err != nil { + return nil, err + } + priorityReadyKit, err := findHighestPriorityReadyKit(kits) + if err != nil { + return nil, err + } + if priorityReadyKit != nil { + integration.SetIntegrationKit(priorityReadyKit) + } + // Run traits that are enabled for the running phase _, err = trait.Apply(ctx, action.client, integration, kit) if err != nil { @@ -84,8 +111,8 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra // Check replicas replicaSets := &appsv1.ReplicaSetList{} err = action.client.List(ctx, replicaSets, - k8sclient.InNamespace(integration.Namespace), - k8sclient.MatchingLabels{ + ctrl.InNamespace(integration.Namespace), + ctrl.MatchingLabels{ v1.IntegrationLabel: integration.Name, }) if err != nil { @@ -160,3 +187,25 @@ func findLatestReplicaSet(list *appsv1.ReplicaSetList) *appsv1.ReplicaSet { } return &latest } + +func findHighestPriorityReadyKit(kits []v1.IntegrationKit) (*v1.IntegrationKit, error) { + if len(kits) == 0 { + return nil, nil + } + var kit *v1.IntegrationKit + priority := 0 + for i, k := range kits { + if k.Status.Phase != v1.IntegrationKitPhaseReady { + continue + } + p, err := strconv.Atoi(k.Labels[v1.IntegrationKitPriorityLabel]) + if err != nil { + return nil, err + } + if p > priority { + kit = &kits[i] + priority = p + } + } + return kit, nil +}
