This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 8ed11203bd06e972793ba8b566d4138f62cd01ec
Author: nicolaferraro <[email protected]>
AuthorDate: Thu Oct 7 17:33:59 2021 +0200

    Fix #1943: allow multiple operators to reconcile label filtered resources
---
 e2e/builder/global_test.go                         |   9 +-
 e2e/common/cli/global_kamelet_test.go              |   2 +-
 e2e/common/operator_id_filtering_test.go           | 136 +++++++++++++++++++++
 e2e/common/operator_metrics_test.go                |   5 +
 e2e/support/test_support.go                        |  67 +++++++---
 pkg/apis/camel/v1/common_types.go                  |   5 +-
 pkg/controller/build/build_controller.go           |   9 +-
 .../integration/integration_controller.go          |   9 +-
 pkg/controller/integrationkit/build.go             |   5 +
 .../integrationkit/integrationkit_controller.go    |  11 +-
 .../integrationplatform_controller.go              |   9 +-
 pkg/controller/kamelet/kamelet_controller.go       |   9 +-
 .../kameletbinding/kamelet_binding_controller.go   |   9 +-
 pkg/platform/operator.go                           |  77 ++++++++++++
 pkg/trait/container.go                             |   6 +
 pkg/trait/quarkus.go                               |   5 +
 pkg/util/defaults/defaults_support.go              |   4 +
 pkg/util/defaults/defaults_test.go                 |   9 ++
 18 files changed, 351 insertions(+), 35 deletions(-)

diff --git a/e2e/builder/global_test.go b/e2e/builder/global_test.go
index 1853981..b91971a 100644
--- a/e2e/builder/global_test.go
+++ b/e2e/builder/global_test.go
@@ -24,9 +24,10 @@ package builder
 
 import (
        "os"
-       ctrl "sigs.k8s.io/controller-runtime/pkg/client"
        "testing"
 
+       ctrl "sigs.k8s.io/controller-runtime/pkg/client"
+
        . "github.com/onsi/gomega"
        "github.com/stretchr/testify/assert"
 
@@ -51,7 +52,7 @@ func TestRunGlobalInstall(t *testing.T) {
        }
 
        test := func(operatorNamespace string) {
-               Expect(Kamel("install", "-n", operatorNamespace, 
"--global").Execute()).To(Succeed())
+               Expect(Kamel("install", "-n", operatorNamespace, "--global", 
"--force").Execute()).To(Succeed())
 
                t.Run("Global test on namespace with platform", func(t 
*testing.T) {
                        WithNewTestNamespace(t, func(ns2 string) {
@@ -146,9 +147,9 @@ func TestRunGlobalInstall(t *testing.T) {
                // global operators are always installed in the 
openshift-operators namespace
                RegisterTestingT(t)
                test("openshift-operators")
-       }else {
+       } else {
                // create new namespace for the global operator
-               WithNewTestNamespace(t,test)
+               WithNewTestNamespace(t, test)
        }
 }
 
diff --git a/e2e/common/cli/global_kamelet_test.go 
b/e2e/common/cli/global_kamelet_test.go
index a667b3b..0c890fa 100644
--- a/e2e/common/cli/global_kamelet_test.go
+++ b/e2e/common/cli/global_kamelet_test.go
@@ -47,7 +47,7 @@ func TestRunGlobalKamelet(t *testing.T) {
        }
 
        WithNewTestNamespace(t, func(ns string) {
-               Expect(Kamel("install", "-n", ns, 
"--global").Execute()).To(Succeed())
+               Expect(Kamel("install", "-n", ns, "--global", 
"--force").Execute()).To(Succeed())
 
                Expect(CreateTimerKamelet(ns, 
"my-own-timer-source")()).To(Succeed())
 
diff --git a/e2e/common/operator_id_filtering_test.go 
b/e2e/common/operator_id_filtering_test.go
new file mode 100644
index 0000000..9473bf9
--- /dev/null
+++ b/e2e/common/operator_id_filtering_test.go
@@ -0,0 +1,136 @@
+//go:build integration
+// +build integration
+
+// To enable compilation of this file in Goland, go to "Settings -> Go -> 
Vendoring & Build Tags -> Custom Tags" and add "integration"
+
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package common
+
+import (
+       "fmt"
+       "os"
+       "testing"
+       "time"
+
+       . "github.com/apache/camel-k/e2e/support"
+       camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+       "github.com/apache/camel-k/pkg/util/openshift"
+       . "github.com/onsi/gomega"
+       "github.com/stretchr/testify/assert"
+       corev1 "k8s.io/api/core/v1"
+)
+
+func TestOperatorIDFiltering(t *testing.T) {
+       forceGlobalTest := os.Getenv("CAMEL_K_FORCE_GLOBAL_TEST") == "true"
+       if !forceGlobalTest {
+               ocp, err := openshift.IsOpenShift(TestClient())
+               assert.Nil(t, err)
+               if ocp {
+                       t.Skip("Prefer not to run on OpenShift to avoid giving 
more permissions to the user running tests")
+                       return
+               }
+       }
+
+       WithNewTestNamespace(t, func(ns string) {
+               WithNewTestNamespace(t, func(nsop1 string) {
+                       WithNewTestNamespace(t, func(nsop2 string) {
+                               Expect(Kamel("install", "-n", nsop1, 
"--operator-env-vars", "KAMEL_OPERATOR_ID=operator-1", "--global", 
"--force").Execute()).To(Succeed())
+                               Expect(AssignPlatformToOperator(nsop1, 
"operator-1")).To(Succeed())
+                               Eventually(PlatformPhase(nsop1), 
TestTimeoutMedium).Should(Equal(camelv1.IntegrationPlatformPhaseReady))
+
+                               Expect(Kamel("install", "-n", nsop2, 
"--operator-env-vars", "KAMEL_OPERATOR_ID=operator-2", "--global", 
"--force").Execute()).To(Succeed())
+                               Expect(AssignPlatformToOperator(nsop2, 
"operator-2")).To(Succeed())
+                               Eventually(PlatformPhase(nsop2), 
TestTimeoutMedium).Should(Equal(camelv1.IntegrationPlatformPhaseReady))
+
+                               t.Run("Operators ignore non-scoped 
integrations", func(t *testing.T) {
+                                       RegisterTestingT(t)
+
+                                       Expect(Kamel("run", "-n", ns, 
"files/yaml.yaml", "--name", "untouched").Execute()).To(Succeed())
+                                       Consistently(IntegrationPhase(ns, 
"untouched"), 10*time.Second).Should(BeEmpty())
+                               })
+
+                               t.Run("Operators run scoped integrations", 
func(t *testing.T) {
+                                       RegisterTestingT(t)
+
+                                       Expect(Kamel("run", "-n", ns, 
"files/yaml.yaml", "--name", "moving").Execute()).To(Succeed())
+                                       Expect(AssignIntegrationToOperator(ns, 
"moving", "operator-1")).To(Succeed())
+                                       Eventually(IntegrationPhase(ns, 
"moving"), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPhaseRunning))
+                                       Eventually(IntegrationPodPhase(ns, 
"moving"), TestTimeoutMedium).Should(Equal(corev1.PodRunning))
+                                       Eventually(IntegrationLogs(ns, 
"moving"), TestTimeoutShort).Should(ContainSubstring("Magicstring!"))
+                               })
+
+                               t.Run("Operators can handoff scoped 
integrations", func(t *testing.T) {
+                                       RegisterTestingT(t)
+
+                                       Expect(AssignIntegrationToOperator(ns, 
"moving", "operator-2")).To(Succeed())
+                                       Eventually(IntegrationPhase(ns, 
"moving"), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPhaseRunning))
+                                       Expect(Kamel("rebuild", "-n", ns, 
"moving").Execute()).To(Succeed())
+                                       Eventually(IntegrationPhase(ns, 
"moving"), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPhaseRunning))
+                                       Eventually(IntegrationPodPhase(ns, 
"moving"), TestTimeoutMedium).Should(Equal(corev1.PodRunning))
+                                       Eventually(IntegrationLogs(ns, 
"moving"), TestTimeoutShort).Should(ContainSubstring("Magicstring!"))
+                               })
+
+                               t.Run("Operators can be deactivated after 
completely handing off scoped integrations", func(t *testing.T) {
+                                       RegisterTestingT(t)
+
+                                       Expect(ScaleOperator(nsop1, 
0)).To(Succeed())
+                                       Expect(Kamel("rebuild", "-n", ns, 
"moving").Execute()).To(Succeed())
+                                       Eventually(IntegrationPhase(ns, 
"moving"), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPhaseRunning))
+                                       Eventually(IntegrationPodPhase(ns, 
"moving"), TestTimeoutMedium).Should(Equal(corev1.PodRunning))
+                                       Eventually(IntegrationLogs(ns, 
"moving"), TestTimeoutShort).Should(ContainSubstring("Magicstring!"))
+                                       Expect(ScaleOperator(nsop1, 
1)).To(Succeed())
+                               })
+
+                               t.Run("Operators can run scoped integrations 
with fixed image", func(t *testing.T) {
+                                       RegisterTestingT(t)
+
+                                       image := IntegrationPodImage(ns, 
"moving")()
+                                       Expect(image).NotTo(BeEmpty())
+                                       // Save resources by deleting "moving" 
integration
+                                       Expect(Kamel("delete", "moving", "-n", 
ns).Execute()).To(Succeed())
+
+                                       Expect(Kamel("run", "-n", ns, 
"files/yaml.yaml", "--name", "pre-built", "-t", 
fmt.Sprintf("container.image=%s", image)).Execute()).To(Succeed())
+                                       Consistently(IntegrationPhase(ns, 
"pre-built"), 10*time.Second).Should(BeEmpty())
+                                       Expect(AssignIntegrationToOperator(ns, 
"pre-built", "operator-2")).To(Succeed())
+                                       Eventually(IntegrationPhase(ns, 
"pre-built"), TestTimeoutShort).Should(Equal(camelv1.IntegrationPhaseRunning))
+                                       Eventually(IntegrationPodPhase(ns, 
"pre-built"), TestTimeoutMedium).Should(Equal(corev1.PodRunning))
+                                       Eventually(IntegrationLogs(ns, 
"pre-built"), TestTimeoutShort).Should(ContainSubstring("Magicstring!"))
+                                       Expect(Kamel("delete", "pre-built", 
"-n", ns).Execute()).To(Succeed())
+                               })
+
+                               t.Run("Operators can run scoped kamelet 
bindings", func(t *testing.T) {
+                                       RegisterTestingT(t)
+
+                                       Expect(Kamel("bind", "-n", ns, 
"timer-source?message=Hello", "log-sink", "--name", 
"klb").Execute()).To(Succeed())
+                                       Consistently(Integration(ns, "klb"), 
10*time.Second).Should(BeNil())
+
+                                       
Expect(AssignKameletBindingToOperator(ns, "klb", "operator-1")).To(Succeed())
+                                       Eventually(Integration(ns, "klb"), 
TestTimeoutShort).ShouldNot(BeNil())
+                                       Eventually(IntegrationPhase(ns, "klb"), 
TestTimeoutMedium).Should(Equal(camelv1.IntegrationPhaseRunning))
+                                       Eventually(IntegrationPodPhase(ns, 
"klb"), TestTimeoutMedium).Should(Equal(corev1.PodRunning))
+                               })
+
+                       })
+               })
+
+               // Clean up
+               RegisterTestingT(t)
+               Expect(Kamel("delete", "--all", "-n", 
ns).Execute()).To(Succeed())
+       })
+}
diff --git a/e2e/common/operator_metrics_test.go 
b/e2e/common/operator_metrics_test.go
index ad3aeb4..226391e 100644
--- a/e2e/common/operator_metrics_test.go
+++ b/e2e/common/operator_metrics_test.go
@@ -72,6 +72,7 @@ func TestMetrics(t *testing.T) {
                Expect(build).NotTo(BeNil())
 
                t.Run("Build duration metric", func(t *testing.T) {
+                       RegisterTestingT(t)
                        // Get the duration from the Build status
                        duration, err := 
time.ParseDuration(build.Status.Duration)
                        Expect(err).To(BeNil())
@@ -130,6 +131,7 @@ func TestMetrics(t *testing.T) {
                })
 
                t.Run("Build recovery attempts metric", func(t *testing.T) {
+                       RegisterTestingT(t)
                        // Check there are no failures reported in the Build 
status
                        Expect(build.Status.Failure).To(BeNil())
 
@@ -167,6 +169,7 @@ func TestMetrics(t *testing.T) {
                })
 
                t.Run("reconciliation duration metric", func(t *testing.T) {
+                       RegisterTestingT(t)
                        
Expect(metrics).To(HaveKeyWithValue("camel_k_reconciliation_duration_seconds",
                                PointTo(MatchFields(IgnoreExtras, Fields{
                                        "Name": 
EqualP("camel_k_reconciliation_duration_seconds"),
@@ -345,6 +348,7 @@ func TestMetrics(t *testing.T) {
                })
 
                t.Run("Build queue duration metric", func(t *testing.T) {
+                       RegisterTestingT(t)
                        var ts1, ts2 time.Time
                        // The start queuing time is taken from the creation 
time
                        ts1 = build.CreationTimestamp.Time
@@ -401,6 +405,7 @@ func TestMetrics(t *testing.T) {
                })
 
                t.Run("Integration first readiness metric", func(t *testing.T) {
+                       RegisterTestingT(t)
                        var ts1, ts2 time.Time
 
                        // The start time is taken from the Integration status 
initialization timestamp
diff --git a/e2e/support/test_support.go b/e2e/support/test_support.go
index 604adcf..60fce20 100644
--- a/e2e/support/test_support.go
+++ b/e2e/support/test_support.go
@@ -39,6 +39,7 @@ import (
 
        "github.com/google/uuid"
        "github.com/onsi/gomega"
+       "github.com/pkg/errors"
        "github.com/spf13/cobra"
 
        appsv1 "k8s.io/api/apps/v1"
@@ -422,6 +423,18 @@ func IntegrationCondition(ns string, name string, 
conditionType v1.IntegrationCo
        }
 }
 
+func AssignIntegrationToOperator(ns, name, operator string) error {
+       it := Integration(ns, name)()
+       if it == nil {
+               return fmt.Errorf("cannot assign integration %q to operator: 
integration not found", name)
+       }
+       if it.Labels == nil {
+               it.Labels = make(map[string]string)
+       }
+       it.Labels[v1.OperatorIDLabel] = operator
+       return TestClient().Update(TestContext, it)
+}
+
 func Lease(ns string, name string) func() *coordination.Lease {
        return func() *coordination.Lease {
                lease := coordination.Lease{}
@@ -727,6 +740,18 @@ func ScaleKameletBinding(ns string, name string, replicas 
int32) error {
        })
 }
 
+func AssignKameletBindingToOperator(ns, name, operator string) error {
+       klb := KameletBinding(ns, name)()
+       if klb == nil {
+               return fmt.Errorf("cannot assign kamelet binding %q to 
operator: kamelet binding not found", name)
+       }
+       if klb.Labels == nil {
+               klb.Labels = make(map[string]string)
+       }
+       klb.Labels[v1.OperatorIDLabel] = operator
+       return TestClient().Update(TestContext, klb)
+}
+
 type KitFilter interface {
        Match(*v1.IntegrationKit) bool
 }
@@ -1043,6 +1068,18 @@ func PlatformProfile(ns string) func() v1.TraitProfile {
        }
 }
 
+func AssignPlatformToOperator(ns, operator string) error {
+       pl := Platform(ns)()
+       if pl == nil {
+               return errors.New("cannot assign platform to operator: no 
platform found")
+       }
+       if pl.Labels == nil {
+               pl.Labels = make(map[string]string)
+       }
+       pl.Labels[v1.OperatorIDLabel] = operator
+       return TestClient().Update(TestContext, pl)
+}
+
 func CRDs() func() []metav1.APIResource {
        return func() []metav1.APIResource {
 
@@ -1115,24 +1152,22 @@ func OperatorTryPodForceKill(ns string, timeSeconds 
int) {
        }
 }
 
-func ScaleOperator(ns string, replicas int32) func() error {
-       return func() error {
-               operator, err := 
TestClient().AppsV1().Deployments(ns).Get(TestContext, "camel-k-operator", 
metav1.GetOptions{})
-               if err != nil {
-                       return err
-               }
-               operator.Spec.Replicas = &replicas
-               _, err = 
TestClient().AppsV1().Deployments(ns).Update(TestContext, operator, 
metav1.UpdateOptions{})
-               if err != nil {
-                       return err
-               }
+func ScaleOperator(ns string, replicas int32) error {
+       operator, err := TestClient().AppsV1().Deployments(ns).Get(TestContext, 
"camel-k-operator", metav1.GetOptions{})
+       if err != nil {
+               return err
+       }
+       operator.Spec.Replicas = &replicas
+       _, err = TestClient().AppsV1().Deployments(ns).Update(TestContext, 
operator, metav1.UpdateOptions{})
+       if err != nil {
+               return err
+       }
 
-               if replicas == 0 {
-                       // speedup scale down by killing the pod
-                       OperatorTryPodForceKill(ns, 10)
-               }
-               return nil
+       if replicas == 0 {
+               // speedup scale down by killing the pod
+               OperatorTryPodForceKill(ns, 10)
        }
+       return nil
 }
 
 func ClusterRole() func() []rbacv1.ClusterRole {
diff --git a/pkg/apis/camel/v1/common_types.go 
b/pkg/apis/camel/v1/common_types.go
index c6d85be..8c2cada 100644
--- a/pkg/apis/camel/v1/common_types.go
+++ b/pkg/apis/camel/v1/common_types.go
@@ -22,7 +22,10 @@ import (
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 )
 
-const TraitAnnotationPrefix = "trait.camel.apache.org/"
+const (
+       TraitAnnotationPrefix = "trait.camel.apache.org/"
+       OperatorIDLabel       = "camel.apache.org/operator.id"
+)
 
 // ConfigurationSpec --
 type ConfigurationSpec struct {
diff --git a/pkg/controller/build/build_controller.go 
b/pkg/controller/build/build_controller.go
index 99ca1d0..e8c1a2d 100644
--- a/pkg/controller/build/build_controller.go
+++ b/pkg/controller/build/build_controller.go
@@ -30,7 +30,6 @@ import (
        ctrl "sigs.k8s.io/controller-runtime/pkg/client"
        "sigs.k8s.io/controller-runtime/pkg/event"
        "sigs.k8s.io/controller-runtime/pkg/manager"
-       "sigs.k8s.io/controller-runtime/pkg/predicate"
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
 
        v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
@@ -71,7 +70,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
                Named("build-controller").
                // Watch for changes to primary resource Build
                For(&v1.Build{}, builder.WithPredicates(
-                       predicate.Funcs{
+                       platform.FilteringFuncs{
                                UpdateFunc: func(e event.UpdateEvent) bool {
                                        oldBuild := e.ObjectOld.(*v1.Build)
                                        newBuild := e.ObjectNew.(*v1.Build)
@@ -130,6 +129,12 @@ func (r *reconcileBuild) Reconcile(ctx context.Context, 
request reconcile.Reques
                return reconcile.Result{}, err
        }
 
+       // Only process resources assigned to the operator
+       if !platform.IsOperatorHandler(&instance) {
+               rlog.Info("Ignoring request because resource is not assigned to 
current operator")
+               return reconcile.Result{}, nil
+       }
+
        target := instance.DeepCopy()
        targetLog := rlog.ForBuild(target)
 
diff --git a/pkg/controller/integration/integration_controller.go 
b/pkg/controller/integration/integration_controller.go
index cb67e53..239cbba 100644
--- a/pkg/controller/integration/integration_controller.go
+++ b/pkg/controller/integration/integration_controller.go
@@ -36,7 +36,6 @@ import (
        "sigs.k8s.io/controller-runtime/pkg/event"
        "sigs.k8s.io/controller-runtime/pkg/handler"
        "sigs.k8s.io/controller-runtime/pkg/manager"
-       "sigs.k8s.io/controller-runtime/pkg/predicate"
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
        "sigs.k8s.io/controller-runtime/pkg/source"
 
@@ -81,7 +80,7 @@ func add(mgr manager.Manager, c client.Client, r 
reconcile.Reconciler) error {
                Named("integration-controller").
                // Watch for changes to primary resource Integration
                For(&v1.Integration{}, builder.WithPredicates(
-                       predicate.Funcs{
+                       platform.FilteringFuncs{
                                UpdateFunc: func(e event.UpdateEvent) bool {
                                        old := e.ObjectOld.(*v1.Integration)
                                        it := e.ObjectNew.(*v1.Integration)
@@ -262,6 +261,12 @@ func (r *reconcileIntegration) Reconcile(ctx 
context.Context, request reconcile.
                return reconcile.Result{}, err
        }
 
+       // Only process resources assigned to the operator
+       if !platform.IsOperatorHandler(&instance) {
+               rlog.Info("Ignoring request because resource is not assigned to 
current operator")
+               return reconcile.Result{}, nil
+       }
+
        target := instance.DeepCopy()
        targetLog := rlog.ForIntegration(target)
 
diff --git a/pkg/controller/integrationkit/build.go 
b/pkg/controller/integrationkit/build.go
index 4e8a688..5be8f7a 100644
--- a/pkg/controller/integrationkit/build.go
+++ b/pkg/controller/integrationkit/build.go
@@ -23,6 +23,7 @@ import (
        "strings"
        "time"
 
+       "github.com/apache/camel-k/pkg/util/defaults"
        "github.com/pkg/errors"
 
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -85,6 +86,10 @@ func (action *buildAction) handleBuildSubmitted(ctx 
context.Context, kit *v1.Int
 
                labels := kubernetes.FilterCamelCreatorLabels(kit.Labels)
                labels[v1.IntegrationKitLayoutLabel] = 
kit.Labels[v1.IntegrationKitLayoutLabel]
+               operatorID := defaults.OperatorID()
+               if operatorID != "" {
+                       labels[v1.OperatorIDLabel] = operatorID
+               }
                timeout := env.Platform.Status.Build.GetTimeout()
                if layout := labels[v1.IntegrationKitLayoutLabel]; 
env.Platform.Spec.Build.Timeout == nil && layout == 
v1.IntegrationKitLayoutNative {
                        // Increase the timeout to a sensible default
diff --git a/pkg/controller/integrationkit/integrationkit_controller.go 
b/pkg/controller/integrationkit/integrationkit_controller.go
index 26b969b..1eebefa 100644
--- a/pkg/controller/integrationkit/integrationkit_controller.go
+++ b/pkg/controller/integrationkit/integrationkit_controller.go
@@ -31,7 +31,6 @@ import (
        "sigs.k8s.io/controller-runtime/pkg/event"
        "sigs.k8s.io/controller-runtime/pkg/handler"
        "sigs.k8s.io/controller-runtime/pkg/manager"
-       "sigs.k8s.io/controller-runtime/pkg/predicate"
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
        "sigs.k8s.io/controller-runtime/pkg/source"
 
@@ -81,7 +80,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
        // Watch for changes to primary resource IntegrationKit
        err = c.Watch(&source.Kind{Type: &v1.IntegrationKit{}},
                &handler.EnqueueRequestForObject{},
-               predicate.Funcs{
+               platform.FilteringFuncs{
                        UpdateFunc: func(e event.UpdateEvent) bool {
                                oldIntegrationKit := 
e.ObjectOld.(*v1.IntegrationKit)
                                newIntegrationKit := 
e.ObjectNew.(*v1.IntegrationKit)
@@ -107,7 +106,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error 
{
                        IsController: true,
                        OwnerType:    &v1.IntegrationKit{},
                },
-               predicate.Funcs{
+               platform.FilteringFuncs{
                        UpdateFunc: func(e event.UpdateEvent) bool {
                                oldBuild := e.ObjectOld.(*v1.Build)
                                newBuild := e.ObjectNew.(*v1.Build)
@@ -202,6 +201,12 @@ func (r *reconcileIntegrationKit) Reconcile(ctx 
context.Context, request reconci
                return reconcile.Result{}, err
        }
 
+       // Only process resources assigned to the operator
+       if !platform.IsOperatorHandler(&instance) {
+               rlog.Info("Ignoring request because resource is not assigned to 
current operator")
+               return reconcile.Result{}, nil
+       }
+
        target := instance.DeepCopy()
        targetLog := rlog.ForIntegrationKit(target)
 
diff --git 
a/pkg/controller/integrationplatform/integrationplatform_controller.go 
b/pkg/controller/integrationplatform/integrationplatform_controller.go
index 264ae4d..66880df 100644
--- a/pkg/controller/integrationplatform/integrationplatform_controller.go
+++ b/pkg/controller/integrationplatform/integrationplatform_controller.go
@@ -31,7 +31,6 @@ import (
        "sigs.k8s.io/controller-runtime/pkg/event"
        "sigs.k8s.io/controller-runtime/pkg/handler"
        "sigs.k8s.io/controller-runtime/pkg/manager"
-       "sigs.k8s.io/controller-runtime/pkg/predicate"
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
        "sigs.k8s.io/controller-runtime/pkg/source"
 
@@ -80,7 +79,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
        // Watch for changes to primary resource IntegrationPlatform
        err = c.Watch(&source.Kind{Type: &v1.IntegrationPlatform{}},
                &handler.EnqueueRequestForObject{},
-               predicate.Funcs{
+               platform.FilteringFuncs{
                        UpdateFunc: func(e event.UpdateEvent) bool {
                                oldIntegrationPlatform := 
e.ObjectOld.(*v1.IntegrationPlatform)
                                newIntegrationPlatform := 
e.ObjectNew.(*v1.IntegrationPlatform)
@@ -149,6 +148,12 @@ func (r *reconcileIntegrationPlatform) Reconcile(ctx 
context.Context, request re
                return reconcile.Result{}, err
        }
 
+       // Only process resources assigned to the operator
+       if !platform.IsOperatorHandler(&instance) {
+               rlog.Info("Ignoring request because resource is not assigned to 
current operator")
+               return reconcile.Result{}, nil
+       }
+
        actions := []Action{
                NewInitializeAction(),
                NewWarmAction(r.reader),
diff --git a/pkg/controller/kamelet/kamelet_controller.go 
b/pkg/controller/kamelet/kamelet_controller.go
index fc12f19..5bc2a07 100644
--- a/pkg/controller/kamelet/kamelet_controller.go
+++ b/pkg/controller/kamelet/kamelet_controller.go
@@ -32,7 +32,6 @@ import (
        "sigs.k8s.io/controller-runtime/pkg/controller"
        "sigs.k8s.io/controller-runtime/pkg/event"
        "sigs.k8s.io/controller-runtime/pkg/manager"
-       "sigs.k8s.io/controller-runtime/pkg/predicate"
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
 
        "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
@@ -72,7 +71,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
                Named("kamelet-controller").
                // Watch for changes to primary resource Kamelet
                For(&v1alpha1.Kamelet{}, builder.WithPredicates(
-                       predicate.Funcs{
+                       platform.FilteringFuncs{
                                UpdateFunc: func(e event.UpdateEvent) bool {
                                        oldKamelet := 
e.ObjectOld.(*v1alpha1.Kamelet)
                                        newKamelet := 
e.ObjectNew.(*v1alpha1.Kamelet)
@@ -137,6 +136,12 @@ func (r *reconcileKamelet) Reconcile(ctx context.Context, 
request reconcile.Requ
                return reconcile.Result{}, err
        }
 
+       // Only process resources assigned to the operator
+       if !platform.IsOperatorHandler(&instance) {
+               rlog.Info("Ignoring request because resource is not assigned to 
current operator")
+               return reconcile.Result{}, nil
+       }
+
        actions := []Action{
                NewInitializeAction(),
                NewMonitorAction(),
diff --git a/pkg/controller/kameletbinding/kamelet_binding_controller.go 
b/pkg/controller/kameletbinding/kamelet_binding_controller.go
index ff23f65..1432ee9 100644
--- a/pkg/controller/kameletbinding/kamelet_binding_controller.go
+++ b/pkg/controller/kameletbinding/kamelet_binding_controller.go
@@ -31,7 +31,6 @@ import (
        "sigs.k8s.io/controller-runtime/pkg/event"
        "sigs.k8s.io/controller-runtime/pkg/handler"
        "sigs.k8s.io/controller-runtime/pkg/manager"
-       "sigs.k8s.io/controller-runtime/pkg/predicate"
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
        "sigs.k8s.io/controller-runtime/pkg/source"
 
@@ -78,7 +77,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
        // Watch for changes to primary resource KameletBinding
        err = c.Watch(&source.Kind{Type: &v1alpha1.KameletBinding{}},
                &handler.EnqueueRequestForObject{},
-               predicate.Funcs{
+               platform.FilteringFuncs{
                        UpdateFunc: func(e event.UpdateEvent) bool {
                                oldKameletBinding := 
e.ObjectOld.(*v1alpha1.KameletBinding)
                                newKameletBinding := 
e.ObjectNew.(*v1alpha1.KameletBinding)
@@ -154,6 +153,12 @@ func (r *ReconcileKameletBinding) Reconcile(ctx 
context.Context, request reconci
                return reconcile.Result{}, err
        }
 
+       // Only process resources assigned to the operator
+       if !platform.IsOperatorHandler(&instance) {
+               rlog.Info("Ignoring request because resource is not assigned to 
current operator")
+               return reconcile.Result{}, nil
+       }
+
        actions := []Action{
                NewInitializeAction(),
                NewMonitorAction(),
diff --git a/pkg/platform/operator.go b/pkg/platform/operator.go
index 8842ab6..d34c898 100644
--- a/pkg/platform/operator.go
+++ b/pkg/platform/operator.go
@@ -22,8 +22,12 @@ import (
        "os"
        "strings"
 
+       camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+       "github.com/apache/camel-k/pkg/util/defaults"
        coordination "k8s.io/api/coordination/v1"
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
+       "sigs.k8s.io/controller-runtime/pkg/event"
+       "sigs.k8s.io/controller-runtime/pkg/predicate"
 
        ctrl "sigs.k8s.io/controller-runtime/pkg/client"
 )
@@ -99,3 +103,76 @@ func IsOperatorAllowedOnNamespace(ctx context.Context, c 
ctrl.Reader, namespace
        }
        return !alreadyOwned, nil
 }
+
+func IsOperatorHandler(object ctrl.Object) bool {
+       if object == nil {
+               return true
+       }
+       resourceID := object.GetLabels()[camelv1.OperatorIDLabel]
+       operatorID := defaults.OperatorID()
+       return resourceID == operatorID
+}
+
+// FilteringFuncs do preliminary checks to determine if certain events should 
be handled by the controller
+// based on labels on the resources (e.g. camel.apache.org/operator.id) and 
the operator configuration,
+// before handing the computation over to the user code.
+type FilteringFuncs struct {
+       // Create returns true if the Create event should be processed
+       CreateFunc func(event.CreateEvent) bool
+
+       // Delete returns true if the Delete event should be processed
+       DeleteFunc func(event.DeleteEvent) bool
+
+       // Update returns true if the Update event should be processed
+       UpdateFunc func(event.UpdateEvent) bool
+
+       // Generic returns true if the Generic event should be processed
+       GenericFunc func(event.GenericEvent) bool
+}
+
+func (f FilteringFuncs) Create(e event.CreateEvent) bool {
+       if !IsOperatorHandler(e.Object) {
+               return false
+       }
+       if f.CreateFunc != nil {
+               return f.CreateFunc(e)
+       }
+       return true
+}
+
+func (f FilteringFuncs) Delete(e event.DeleteEvent) bool {
+       if !IsOperatorHandler(e.Object) {
+               return false
+       }
+       if f.DeleteFunc != nil {
+               return f.DeleteFunc(e)
+       }
+       return true
+}
+
+func (f FilteringFuncs) Update(e event.UpdateEvent) bool {
+       if !IsOperatorHandler(e.ObjectNew) {
+               return false
+       }
+       if e.ObjectOld != nil && e.ObjectNew != nil &&
+               e.ObjectOld.GetLabels()[camelv1.OperatorIDLabel] != 
e.ObjectNew.GetLabels()[camelv1.OperatorIDLabel] {
+               // Always force reconciliation when the object becomes managed 
by the current operator
+               return true
+       }
+       if f.UpdateFunc != nil {
+               return f.UpdateFunc(e)
+       }
+       return true
+}
+
+func (f FilteringFuncs) Generic(e event.GenericEvent) bool {
+       if !IsOperatorHandler(e.Object) {
+               return false
+       }
+       if f.GenericFunc != nil {
+               return f.GenericFunc(e)
+       }
+       return true
+}
+
+var _ predicate.Predicate = FilteringFuncs{}
diff --git a/pkg/trait/container.go b/pkg/trait/container.go
index 9685e17..ea4973f 100644
--- a/pkg/trait/container.go
+++ b/pkg/trait/container.go
@@ -22,6 +22,7 @@ import (
        "path"
        "sort"
 
+       "github.com/apache/camel-k/pkg/util/defaults"
        appsv1 "k8s.io/api/apps/v1"
        "k8s.io/api/batch/v1beta1"
        corev1 "k8s.io/api/core/v1"
@@ -215,6 +216,11 @@ func (t *containerTrait) configureImageIntegrationKit(e 
*Environment) error {
                        kubernetes.CamelCreatorLabelVersion:   
e.Integration.ResourceVersion,
                }
 
+               operatorID := defaults.OperatorID()
+               if operatorID != "" {
+                       kit.Labels[v1.OperatorIDLabel] = operatorID
+               }
+
                t.L.Infof("image %s", kit.Spec.Image)
                e.Resources.Add(kit)
                e.Integration.SetIntegrationKit(kit)
diff --git a/pkg/trait/quarkus.go b/pkg/trait/quarkus.go
index f10242d..2b53549 100644
--- a/pkg/trait/quarkus.go
+++ b/pkg/trait/quarkus.go
@@ -261,6 +261,11 @@ func (t *quarkusTrait) newIntegrationKit(e *Environment, 
packageType quarkusPack
                kubernetes.CamelCreatorLabelVersion:   
integration.ResourceVersion,
        }
 
+       operatorID := defaults.OperatorID()
+       if operatorID != "" {
+               kit.Labels[v1.OperatorIDLabel] = operatorID
+       }
+
        traits := t.getKitTraits(e)
 
        kit.Spec = v1.IntegrationKitSpec{
diff --git a/pkg/util/defaults/defaults_support.go 
b/pkg/util/defaults/defaults_support.go
index 6849c15..a50ac2a 100644
--- a/pkg/util/defaults/defaults_support.go
+++ b/pkg/util/defaults/defaults_support.go
@@ -32,6 +32,10 @@ func InstallDefaultKamelets() bool {
        return boolEnvOrDefault(installDefaultKamelets, 
"KAMEL_INSTALL_DEFAULT_KAMELETS")
 }
 
+func OperatorID() string {
+       return envOrDefault("", "KAMEL_OPERATOR_ID")
+}
+
 func boolEnvOrDefault(def bool, envs ...string) bool {
        strVal := envOrDefault(strconv.FormatBool(def), envs...)
        res, err := strconv.ParseBool(strVal)
diff --git a/pkg/util/defaults/defaults_test.go 
b/pkg/util/defaults/defaults_test.go
index e49611e..1bc8dbb 100644
--- a/pkg/util/defaults/defaults_test.go
+++ b/pkg/util/defaults/defaults_test.go
@@ -49,3 +49,12 @@ func TestOverriddenInstallDefaultKamelets(t *testing.T) {
        assert.False(t, InstallDefaultKamelets())
        assert.NoError(t, os.Setenv(env, oldEnvVal))
 }
+
+func TestOverriddenOperatorID(t *testing.T) {
+       env := "KAMEL_OPERATOR_ID"
+       oldEnvVal := os.Getenv(env)
+       overriddenID := "operator-1"
+       assert.NoError(t, os.Setenv(env, overriddenID))
+       assert.Equal(t, overriddenID, OperatorID())
+       assert.NoError(t, os.Setenv(env, oldEnvVal))
+}

Reply via email to