This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 8ed11203bd06e972793ba8b566d4138f62cd01ec Author: nicolaferraro <[email protected]> AuthorDate: Thu Oct 7 17:33:59 2021 +0200 Fix #1943: allow multiple operators to reconcile label filtered resources --- e2e/builder/global_test.go | 9 +- e2e/common/cli/global_kamelet_test.go | 2 +- e2e/common/operator_id_filtering_test.go | 136 +++++++++++++++++++++ e2e/common/operator_metrics_test.go | 5 + e2e/support/test_support.go | 67 +++++++--- pkg/apis/camel/v1/common_types.go | 5 +- pkg/controller/build/build_controller.go | 9 +- .../integration/integration_controller.go | 9 +- pkg/controller/integrationkit/build.go | 5 + .../integrationkit/integrationkit_controller.go | 11 +- .../integrationplatform_controller.go | 9 +- pkg/controller/kamelet/kamelet_controller.go | 9 +- .../kameletbinding/kamelet_binding_controller.go | 9 +- pkg/platform/operator.go | 77 ++++++++++++ pkg/trait/container.go | 6 + pkg/trait/quarkus.go | 5 + pkg/util/defaults/defaults_support.go | 4 + pkg/util/defaults/defaults_test.go | 9 ++ 18 files changed, 351 insertions(+), 35 deletions(-) diff --git a/e2e/builder/global_test.go b/e2e/builder/global_test.go index 1853981..b91971a 100644 --- a/e2e/builder/global_test.go +++ b/e2e/builder/global_test.go @@ -24,9 +24,10 @@ package builder import ( "os" - ctrl "sigs.k8s.io/controller-runtime/pkg/client" "testing" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + . "github.com/onsi/gomega" "github.com/stretchr/testify/assert" @@ -51,7 +52,7 @@ func TestRunGlobalInstall(t *testing.T) { } test := func(operatorNamespace string) { - Expect(Kamel("install", "-n", operatorNamespace, "--global").Execute()).To(Succeed()) + Expect(Kamel("install", "-n", operatorNamespace, "--global", "--force").Execute()).To(Succeed()) t.Run("Global test on namespace with platform", func(t *testing.T) { WithNewTestNamespace(t, func(ns2 string) { @@ -146,9 +147,9 @@ func TestRunGlobalInstall(t *testing.T) { // global operators are always installed in the openshift-operators namespace RegisterTestingT(t) test("openshift-operators") - }else { + } else { // create new namespace for the global operator - WithNewTestNamespace(t,test) + WithNewTestNamespace(t, test) } } diff --git a/e2e/common/cli/global_kamelet_test.go b/e2e/common/cli/global_kamelet_test.go index a667b3b..0c890fa 100644 --- a/e2e/common/cli/global_kamelet_test.go +++ b/e2e/common/cli/global_kamelet_test.go @@ -47,7 +47,7 @@ func TestRunGlobalKamelet(t *testing.T) { } WithNewTestNamespace(t, func(ns string) { - Expect(Kamel("install", "-n", ns, "--global").Execute()).To(Succeed()) + Expect(Kamel("install", "-n", ns, "--global", "--force").Execute()).To(Succeed()) Expect(CreateTimerKamelet(ns, "my-own-timer-source")()).To(Succeed()) diff --git a/e2e/common/operator_id_filtering_test.go b/e2e/common/operator_id_filtering_test.go new file mode 100644 index 0000000..9473bf9 --- /dev/null +++ b/e2e/common/operator_id_filtering_test.go @@ -0,0 +1,136 @@ +//go:build integration +// +build integration + +// To enable compilation of this file in Goland, go to "Settings -> Go -> Vendoring & Build Tags -> Custom Tags" and add "integration" + +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import ( + "fmt" + "os" + "testing" + "time" + + . "github.com/apache/camel-k/e2e/support" + camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1" + "github.com/apache/camel-k/pkg/util/openshift" + . "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" +) + +func TestOperatorIDFiltering(t *testing.T) { + forceGlobalTest := os.Getenv("CAMEL_K_FORCE_GLOBAL_TEST") == "true" + if !forceGlobalTest { + ocp, err := openshift.IsOpenShift(TestClient()) + assert.Nil(t, err) + if ocp { + t.Skip("Prefer not to run on OpenShift to avoid giving more permissions to the user running tests") + return + } + } + + WithNewTestNamespace(t, func(ns string) { + WithNewTestNamespace(t, func(nsop1 string) { + WithNewTestNamespace(t, func(nsop2 string) { + Expect(Kamel("install", "-n", nsop1, "--operator-env-vars", "KAMEL_OPERATOR_ID=operator-1", "--global", "--force").Execute()).To(Succeed()) + Expect(AssignPlatformToOperator(nsop1, "operator-1")).To(Succeed()) + Eventually(PlatformPhase(nsop1), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPlatformPhaseReady)) + + Expect(Kamel("install", "-n", nsop2, "--operator-env-vars", "KAMEL_OPERATOR_ID=operator-2", "--global", "--force").Execute()).To(Succeed()) + Expect(AssignPlatformToOperator(nsop2, "operator-2")).To(Succeed()) + Eventually(PlatformPhase(nsop2), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPlatformPhaseReady)) + + t.Run("Operators ignore non-scoped integrations", func(t *testing.T) { + RegisterTestingT(t) + + Expect(Kamel("run", "-n", ns, "files/yaml.yaml", "--name", "untouched").Execute()).To(Succeed()) + Consistently(IntegrationPhase(ns, "untouched"), 10*time.Second).Should(BeEmpty()) + }) + + t.Run("Operators run scoped integrations", func(t *testing.T) { + RegisterTestingT(t) + + Expect(Kamel("run", "-n", ns, "files/yaml.yaml", "--name", "moving").Execute()).To(Succeed()) + Expect(AssignIntegrationToOperator(ns, "moving", "operator-1")).To(Succeed()) + Eventually(IntegrationPhase(ns, "moving"), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPhaseRunning)) + Eventually(IntegrationPodPhase(ns, "moving"), TestTimeoutMedium).Should(Equal(corev1.PodRunning)) + Eventually(IntegrationLogs(ns, "moving"), TestTimeoutShort).Should(ContainSubstring("Magicstring!")) + }) + + t.Run("Operators can handoff scoped integrations", func(t *testing.T) { + RegisterTestingT(t) + + Expect(AssignIntegrationToOperator(ns, "moving", "operator-2")).To(Succeed()) + Eventually(IntegrationPhase(ns, "moving"), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPhaseRunning)) + Expect(Kamel("rebuild", "-n", ns, "moving").Execute()).To(Succeed()) + Eventually(IntegrationPhase(ns, "moving"), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPhaseRunning)) + Eventually(IntegrationPodPhase(ns, "moving"), TestTimeoutMedium).Should(Equal(corev1.PodRunning)) + Eventually(IntegrationLogs(ns, "moving"), TestTimeoutShort).Should(ContainSubstring("Magicstring!")) + }) + + t.Run("Operators can be deactivated after completely handing off scoped integrations", func(t *testing.T) { + RegisterTestingT(t) + + Expect(ScaleOperator(nsop1, 0)).To(Succeed()) + Expect(Kamel("rebuild", "-n", ns, "moving").Execute()).To(Succeed()) + Eventually(IntegrationPhase(ns, "moving"), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPhaseRunning)) + Eventually(IntegrationPodPhase(ns, "moving"), TestTimeoutMedium).Should(Equal(corev1.PodRunning)) + Eventually(IntegrationLogs(ns, "moving"), TestTimeoutShort).Should(ContainSubstring("Magicstring!")) + Expect(ScaleOperator(nsop1, 1)).To(Succeed()) + }) + + t.Run("Operators can run scoped integrations with fixed image", func(t *testing.T) { + RegisterTestingT(t) + + image := IntegrationPodImage(ns, "moving")() + Expect(image).NotTo(BeEmpty()) + // Save resources by deleting "moving" integration + Expect(Kamel("delete", "moving", "-n", ns).Execute()).To(Succeed()) + + Expect(Kamel("run", "-n", ns, "files/yaml.yaml", "--name", "pre-built", "-t", fmt.Sprintf("container.image=%s", image)).Execute()).To(Succeed()) + Consistently(IntegrationPhase(ns, "pre-built"), 10*time.Second).Should(BeEmpty()) + Expect(AssignIntegrationToOperator(ns, "pre-built", "operator-2")).To(Succeed()) + Eventually(IntegrationPhase(ns, "pre-built"), TestTimeoutShort).Should(Equal(camelv1.IntegrationPhaseRunning)) + Eventually(IntegrationPodPhase(ns, "pre-built"), TestTimeoutMedium).Should(Equal(corev1.PodRunning)) + Eventually(IntegrationLogs(ns, "pre-built"), TestTimeoutShort).Should(ContainSubstring("Magicstring!")) + Expect(Kamel("delete", "pre-built", "-n", ns).Execute()).To(Succeed()) + }) + + t.Run("Operators can run scoped kamelet bindings", func(t *testing.T) { + RegisterTestingT(t) + + Expect(Kamel("bind", "-n", ns, "timer-source?message=Hello", "log-sink", "--name", "klb").Execute()).To(Succeed()) + Consistently(Integration(ns, "klb"), 10*time.Second).Should(BeNil()) + + Expect(AssignKameletBindingToOperator(ns, "klb", "operator-1")).To(Succeed()) + Eventually(Integration(ns, "klb"), TestTimeoutShort).ShouldNot(BeNil()) + Eventually(IntegrationPhase(ns, "klb"), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPhaseRunning)) + Eventually(IntegrationPodPhase(ns, "klb"), TestTimeoutMedium).Should(Equal(corev1.PodRunning)) + }) + + }) + }) + + // Clean up + RegisterTestingT(t) + Expect(Kamel("delete", "--all", "-n", ns).Execute()).To(Succeed()) + }) +} diff --git a/e2e/common/operator_metrics_test.go b/e2e/common/operator_metrics_test.go index ad3aeb4..226391e 100644 --- a/e2e/common/operator_metrics_test.go +++ b/e2e/common/operator_metrics_test.go @@ -72,6 +72,7 @@ func TestMetrics(t *testing.T) { Expect(build).NotTo(BeNil()) t.Run("Build duration metric", func(t *testing.T) { + RegisterTestingT(t) // Get the duration from the Build status duration, err := time.ParseDuration(build.Status.Duration) Expect(err).To(BeNil()) @@ -130,6 +131,7 @@ func TestMetrics(t *testing.T) { }) t.Run("Build recovery attempts metric", func(t *testing.T) { + RegisterTestingT(t) // Check there are no failures reported in the Build status Expect(build.Status.Failure).To(BeNil()) @@ -167,6 +169,7 @@ func TestMetrics(t *testing.T) { }) t.Run("reconciliation duration metric", func(t *testing.T) { + RegisterTestingT(t) Expect(metrics).To(HaveKeyWithValue("camel_k_reconciliation_duration_seconds", PointTo(MatchFields(IgnoreExtras, Fields{ "Name": EqualP("camel_k_reconciliation_duration_seconds"), @@ -345,6 +348,7 @@ func TestMetrics(t *testing.T) { }) t.Run("Build queue duration metric", func(t *testing.T) { + RegisterTestingT(t) var ts1, ts2 time.Time // The start queuing time is taken from the creation time ts1 = build.CreationTimestamp.Time @@ -401,6 +405,7 @@ func TestMetrics(t *testing.T) { }) t.Run("Integration first readiness metric", func(t *testing.T) { + RegisterTestingT(t) var ts1, ts2 time.Time // The start time is taken from the Integration status initialization timestamp diff --git a/e2e/support/test_support.go b/e2e/support/test_support.go index 604adcf..60fce20 100644 --- a/e2e/support/test_support.go +++ b/e2e/support/test_support.go @@ -39,6 +39,7 @@ import ( "github.com/google/uuid" "github.com/onsi/gomega" + "github.com/pkg/errors" "github.com/spf13/cobra" appsv1 "k8s.io/api/apps/v1" @@ -422,6 +423,18 @@ func IntegrationCondition(ns string, name string, conditionType v1.IntegrationCo } } +func AssignIntegrationToOperator(ns, name, operator string) error { + it := Integration(ns, name)() + if it == nil { + return fmt.Errorf("cannot assign integration %q to operator: integration not found", name) + } + if it.Labels == nil { + it.Labels = make(map[string]string) + } + it.Labels[v1.OperatorIDLabel] = operator + return TestClient().Update(TestContext, it) +} + func Lease(ns string, name string) func() *coordination.Lease { return func() *coordination.Lease { lease := coordination.Lease{} @@ -727,6 +740,18 @@ func ScaleKameletBinding(ns string, name string, replicas int32) error { }) } +func AssignKameletBindingToOperator(ns, name, operator string) error { + klb := KameletBinding(ns, name)() + if klb == nil { + return fmt.Errorf("cannot assign kamelet binding %q to operator: kamelet binding not found", name) + } + if klb.Labels == nil { + klb.Labels = make(map[string]string) + } + klb.Labels[v1.OperatorIDLabel] = operator + return TestClient().Update(TestContext, klb) +} + type KitFilter interface { Match(*v1.IntegrationKit) bool } @@ -1043,6 +1068,18 @@ func PlatformProfile(ns string) func() v1.TraitProfile { } } +func AssignPlatformToOperator(ns, operator string) error { + pl := Platform(ns)() + if pl == nil { + return errors.New("cannot assign platform to operator: no platform found") + } + if pl.Labels == nil { + pl.Labels = make(map[string]string) + } + pl.Labels[v1.OperatorIDLabel] = operator + return TestClient().Update(TestContext, pl) +} + func CRDs() func() []metav1.APIResource { return func() []metav1.APIResource { @@ -1115,24 +1152,22 @@ func OperatorTryPodForceKill(ns string, timeSeconds int) { } } -func ScaleOperator(ns string, replicas int32) func() error { - return func() error { - operator, err := TestClient().AppsV1().Deployments(ns).Get(TestContext, "camel-k-operator", metav1.GetOptions{}) - if err != nil { - return err - } - operator.Spec.Replicas = &replicas - _, err = TestClient().AppsV1().Deployments(ns).Update(TestContext, operator, metav1.UpdateOptions{}) - if err != nil { - return err - } +func ScaleOperator(ns string, replicas int32) error { + operator, err := TestClient().AppsV1().Deployments(ns).Get(TestContext, "camel-k-operator", metav1.GetOptions{}) + if err != nil { + return err + } + operator.Spec.Replicas = &replicas + _, err = TestClient().AppsV1().Deployments(ns).Update(TestContext, operator, metav1.UpdateOptions{}) + if err != nil { + return err + } - if replicas == 0 { - // speedup scale down by killing the pod - OperatorTryPodForceKill(ns, 10) - } - return nil + if replicas == 0 { + // speedup scale down by killing the pod + OperatorTryPodForceKill(ns, 10) } + return nil } func ClusterRole() func() []rbacv1.ClusterRole { diff --git a/pkg/apis/camel/v1/common_types.go b/pkg/apis/camel/v1/common_types.go index c6d85be..8c2cada 100644 --- a/pkg/apis/camel/v1/common_types.go +++ b/pkg/apis/camel/v1/common_types.go @@ -22,7 +22,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -const TraitAnnotationPrefix = "trait.camel.apache.org/" +const ( + TraitAnnotationPrefix = "trait.camel.apache.org/" + OperatorIDLabel = "camel.apache.org/operator.id" +) // ConfigurationSpec -- type ConfigurationSpec struct { diff --git a/pkg/controller/build/build_controller.go b/pkg/controller/build/build_controller.go index 99ca1d0..e8c1a2d 100644 --- a/pkg/controller/build/build_controller.go +++ b/pkg/controller/build/build_controller.go @@ -30,7 +30,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" v1 "github.com/apache/camel-k/pkg/apis/camel/v1" @@ -71,7 +70,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { Named("build-controller"). // Watch for changes to primary resource Build For(&v1.Build{}, builder.WithPredicates( - predicate.Funcs{ + platform.FilteringFuncs{ UpdateFunc: func(e event.UpdateEvent) bool { oldBuild := e.ObjectOld.(*v1.Build) newBuild := e.ObjectNew.(*v1.Build) @@ -130,6 +129,12 @@ func (r *reconcileBuild) Reconcile(ctx context.Context, request reconcile.Reques return reconcile.Result{}, err } + // Only process resources assigned to the operator + if !platform.IsOperatorHandler(&instance) { + rlog.Info("Ignoring request because resource is not assigned to current operator") + return reconcile.Result{}, nil + } + target := instance.DeepCopy() targetLog := rlog.ForBuild(target) diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go index cb67e53..239cbba 100644 --- a/pkg/controller/integration/integration_controller.go +++ b/pkg/controller/integration/integration_controller.go @@ -36,7 +36,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -81,7 +80,7 @@ func add(mgr manager.Manager, c client.Client, r reconcile.Reconciler) error { Named("integration-controller"). // Watch for changes to primary resource Integration For(&v1.Integration{}, builder.WithPredicates( - predicate.Funcs{ + platform.FilteringFuncs{ UpdateFunc: func(e event.UpdateEvent) bool { old := e.ObjectOld.(*v1.Integration) it := e.ObjectNew.(*v1.Integration) @@ -262,6 +261,12 @@ func (r *reconcileIntegration) Reconcile(ctx context.Context, request reconcile. return reconcile.Result{}, err } + // Only process resources assigned to the operator + if !platform.IsOperatorHandler(&instance) { + rlog.Info("Ignoring request because resource is not assigned to current operator") + return reconcile.Result{}, nil + } + target := instance.DeepCopy() targetLog := rlog.ForIntegration(target) diff --git a/pkg/controller/integrationkit/build.go b/pkg/controller/integrationkit/build.go index 4e8a688..5be8f7a 100644 --- a/pkg/controller/integrationkit/build.go +++ b/pkg/controller/integrationkit/build.go @@ -23,6 +23,7 @@ import ( "strings" "time" + "github.com/apache/camel-k/pkg/util/defaults" "github.com/pkg/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -85,6 +86,10 @@ func (action *buildAction) handleBuildSubmitted(ctx context.Context, kit *v1.Int labels := kubernetes.FilterCamelCreatorLabels(kit.Labels) labels[v1.IntegrationKitLayoutLabel] = kit.Labels[v1.IntegrationKitLayoutLabel] + operatorID := defaults.OperatorID() + if operatorID != "" { + labels[v1.OperatorIDLabel] = operatorID + } timeout := env.Platform.Status.Build.GetTimeout() if layout := labels[v1.IntegrationKitLayoutLabel]; env.Platform.Spec.Build.Timeout == nil && layout == v1.IntegrationKitLayoutNative { // Increase the timeout to a sensible default diff --git a/pkg/controller/integrationkit/integrationkit_controller.go b/pkg/controller/integrationkit/integrationkit_controller.go index 26b969b..1eebefa 100644 --- a/pkg/controller/integrationkit/integrationkit_controller.go +++ b/pkg/controller/integrationkit/integrationkit_controller.go @@ -31,7 +31,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -81,7 +80,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { // Watch for changes to primary resource IntegrationKit err = c.Watch(&source.Kind{Type: &v1.IntegrationKit{}}, &handler.EnqueueRequestForObject{}, - predicate.Funcs{ + platform.FilteringFuncs{ UpdateFunc: func(e event.UpdateEvent) bool { oldIntegrationKit := e.ObjectOld.(*v1.IntegrationKit) newIntegrationKit := e.ObjectNew.(*v1.IntegrationKit) @@ -107,7 +106,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { IsController: true, OwnerType: &v1.IntegrationKit{}, }, - predicate.Funcs{ + platform.FilteringFuncs{ UpdateFunc: func(e event.UpdateEvent) bool { oldBuild := e.ObjectOld.(*v1.Build) newBuild := e.ObjectNew.(*v1.Build) @@ -202,6 +201,12 @@ func (r *reconcileIntegrationKit) Reconcile(ctx context.Context, request reconci return reconcile.Result{}, err } + // Only process resources assigned to the operator + if !platform.IsOperatorHandler(&instance) { + rlog.Info("Ignoring request because resource is not assigned to current operator") + return reconcile.Result{}, nil + } + target := instance.DeepCopy() targetLog := rlog.ForIntegrationKit(target) diff --git a/pkg/controller/integrationplatform/integrationplatform_controller.go b/pkg/controller/integrationplatform/integrationplatform_controller.go index 264ae4d..66880df 100644 --- a/pkg/controller/integrationplatform/integrationplatform_controller.go +++ b/pkg/controller/integrationplatform/integrationplatform_controller.go @@ -31,7 +31,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -80,7 +79,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { // Watch for changes to primary resource IntegrationPlatform err = c.Watch(&source.Kind{Type: &v1.IntegrationPlatform{}}, &handler.EnqueueRequestForObject{}, - predicate.Funcs{ + platform.FilteringFuncs{ UpdateFunc: func(e event.UpdateEvent) bool { oldIntegrationPlatform := e.ObjectOld.(*v1.IntegrationPlatform) newIntegrationPlatform := e.ObjectNew.(*v1.IntegrationPlatform) @@ -149,6 +148,12 @@ func (r *reconcileIntegrationPlatform) Reconcile(ctx context.Context, request re return reconcile.Result{}, err } + // Only process resources assigned to the operator + if !platform.IsOperatorHandler(&instance) { + rlog.Info("Ignoring request because resource is not assigned to current operator") + return reconcile.Result{}, nil + } + actions := []Action{ NewInitializeAction(), NewWarmAction(r.reader), diff --git a/pkg/controller/kamelet/kamelet_controller.go b/pkg/controller/kamelet/kamelet_controller.go index fc12f19..5bc2a07 100644 --- a/pkg/controller/kamelet/kamelet_controller.go +++ b/pkg/controller/kamelet/kamelet_controller.go @@ -32,7 +32,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" @@ -72,7 +71,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { Named("kamelet-controller"). // Watch for changes to primary resource Kamelet For(&v1alpha1.Kamelet{}, builder.WithPredicates( - predicate.Funcs{ + platform.FilteringFuncs{ UpdateFunc: func(e event.UpdateEvent) bool { oldKamelet := e.ObjectOld.(*v1alpha1.Kamelet) newKamelet := e.ObjectNew.(*v1alpha1.Kamelet) @@ -137,6 +136,12 @@ func (r *reconcileKamelet) Reconcile(ctx context.Context, request reconcile.Requ return reconcile.Result{}, err } + // Only process resources assigned to the operator + if !platform.IsOperatorHandler(&instance) { + rlog.Info("Ignoring request because resource is not assigned to current operator") + return reconcile.Result{}, nil + } + actions := []Action{ NewInitializeAction(), NewMonitorAction(), diff --git a/pkg/controller/kameletbinding/kamelet_binding_controller.go b/pkg/controller/kameletbinding/kamelet_binding_controller.go index ff23f65..1432ee9 100644 --- a/pkg/controller/kameletbinding/kamelet_binding_controller.go +++ b/pkg/controller/kameletbinding/kamelet_binding_controller.go @@ -31,7 +31,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -78,7 +77,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { // Watch for changes to primary resource KameletBinding err = c.Watch(&source.Kind{Type: &v1alpha1.KameletBinding{}}, &handler.EnqueueRequestForObject{}, - predicate.Funcs{ + platform.FilteringFuncs{ UpdateFunc: func(e event.UpdateEvent) bool { oldKameletBinding := e.ObjectOld.(*v1alpha1.KameletBinding) newKameletBinding := e.ObjectNew.(*v1alpha1.KameletBinding) @@ -154,6 +153,12 @@ func (r *ReconcileKameletBinding) Reconcile(ctx context.Context, request reconci return reconcile.Result{}, err } + // Only process resources assigned to the operator + if !platform.IsOperatorHandler(&instance) { + rlog.Info("Ignoring request because resource is not assigned to current operator") + return reconcile.Result{}, nil + } + actions := []Action{ NewInitializeAction(), NewMonitorAction(), diff --git a/pkg/platform/operator.go b/pkg/platform/operator.go index 8842ab6..d34c898 100644 --- a/pkg/platform/operator.go +++ b/pkg/platform/operator.go @@ -22,8 +22,12 @@ import ( "os" "strings" + camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1" + "github.com/apache/camel-k/pkg/util/defaults" coordination "k8s.io/api/coordination/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" ctrl "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -99,3 +103,76 @@ func IsOperatorAllowedOnNamespace(ctx context.Context, c ctrl.Reader, namespace } return !alreadyOwned, nil } + +func IsOperatorHandler(object ctrl.Object) bool { + if object == nil { + return true + } + resourceID := object.GetLabels()[camelv1.OperatorIDLabel] + operatorID := defaults.OperatorID() + return resourceID == operatorID +} + +// FilteringFuncs do preliminary checks to determine if certain events should be handled by the controller +// based on labels on the resources (e.g. camel.apache.org/operator.id) and the operator configuration, +// before handing the computation over to the user code. +type FilteringFuncs struct { + // Create returns true if the Create event should be processed + CreateFunc func(event.CreateEvent) bool + + // Delete returns true if the Delete event should be processed + DeleteFunc func(event.DeleteEvent) bool + + // Update returns true if the Update event should be processed + UpdateFunc func(event.UpdateEvent) bool + + // Generic returns true if the Generic event should be processed + GenericFunc func(event.GenericEvent) bool +} + +func (f FilteringFuncs) Create(e event.CreateEvent) bool { + if !IsOperatorHandler(e.Object) { + return false + } + if f.CreateFunc != nil { + return f.CreateFunc(e) + } + return true +} + +func (f FilteringFuncs) Delete(e event.DeleteEvent) bool { + if !IsOperatorHandler(e.Object) { + return false + } + if f.DeleteFunc != nil { + return f.DeleteFunc(e) + } + return true +} + +func (f FilteringFuncs) Update(e event.UpdateEvent) bool { + if !IsOperatorHandler(e.ObjectNew) { + return false + } + if e.ObjectOld != nil && e.ObjectNew != nil && + e.ObjectOld.GetLabels()[camelv1.OperatorIDLabel] != e.ObjectNew.GetLabels()[camelv1.OperatorIDLabel] { + // Always force reconciliation when the object becomes managed by the current operator + return true + } + if f.UpdateFunc != nil { + return f.UpdateFunc(e) + } + return true +} + +func (f FilteringFuncs) Generic(e event.GenericEvent) bool { + if !IsOperatorHandler(e.Object) { + return false + } + if f.GenericFunc != nil { + return f.GenericFunc(e) + } + return true +} + +var _ predicate.Predicate = FilteringFuncs{} diff --git a/pkg/trait/container.go b/pkg/trait/container.go index 9685e17..ea4973f 100644 --- a/pkg/trait/container.go +++ b/pkg/trait/container.go @@ -22,6 +22,7 @@ import ( "path" "sort" + "github.com/apache/camel-k/pkg/util/defaults" appsv1 "k8s.io/api/apps/v1" "k8s.io/api/batch/v1beta1" corev1 "k8s.io/api/core/v1" @@ -215,6 +216,11 @@ func (t *containerTrait) configureImageIntegrationKit(e *Environment) error { kubernetes.CamelCreatorLabelVersion: e.Integration.ResourceVersion, } + operatorID := defaults.OperatorID() + if operatorID != "" { + kit.Labels[v1.OperatorIDLabel] = operatorID + } + t.L.Infof("image %s", kit.Spec.Image) e.Resources.Add(kit) e.Integration.SetIntegrationKit(kit) diff --git a/pkg/trait/quarkus.go b/pkg/trait/quarkus.go index f10242d..2b53549 100644 --- a/pkg/trait/quarkus.go +++ b/pkg/trait/quarkus.go @@ -261,6 +261,11 @@ func (t *quarkusTrait) newIntegrationKit(e *Environment, packageType quarkusPack kubernetes.CamelCreatorLabelVersion: integration.ResourceVersion, } + operatorID := defaults.OperatorID() + if operatorID != "" { + kit.Labels[v1.OperatorIDLabel] = operatorID + } + traits := t.getKitTraits(e) kit.Spec = v1.IntegrationKitSpec{ diff --git a/pkg/util/defaults/defaults_support.go b/pkg/util/defaults/defaults_support.go index 6849c15..a50ac2a 100644 --- a/pkg/util/defaults/defaults_support.go +++ b/pkg/util/defaults/defaults_support.go @@ -32,6 +32,10 @@ func InstallDefaultKamelets() bool { return boolEnvOrDefault(installDefaultKamelets, "KAMEL_INSTALL_DEFAULT_KAMELETS") } +func OperatorID() string { + return envOrDefault("", "KAMEL_OPERATOR_ID") +} + func boolEnvOrDefault(def bool, envs ...string) bool { strVal := envOrDefault(strconv.FormatBool(def), envs...) res, err := strconv.ParseBool(strVal) diff --git a/pkg/util/defaults/defaults_test.go b/pkg/util/defaults/defaults_test.go index e49611e..1bc8dbb 100644 --- a/pkg/util/defaults/defaults_test.go +++ b/pkg/util/defaults/defaults_test.go @@ -49,3 +49,12 @@ func TestOverriddenInstallDefaultKamelets(t *testing.T) { assert.False(t, InstallDefaultKamelets()) assert.NoError(t, os.Setenv(env, oldEnvVal)) } + +func TestOverriddenOperatorID(t *testing.T) { + env := "KAMEL_OPERATOR_ID" + oldEnvVal := os.Getenv(env) + overriddenID := "operator-1" + assert.NoError(t, os.Setenv(env, overriddenID)) + assert.Equal(t, overriddenID, OperatorID()) + assert.NoError(t, os.Setenv(env, oldEnvVal)) +}
