This is an automated email from the ASF dual-hosted git repository.

pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git


The following commit(s) were added to refs/heads/main by this push:
     new 082835ee1 feat(util): allow Kamelet crossnamespace in Pipes
082835ee1 is described below

commit 082835ee11693ddf4589bdaafbfd921a367035ae
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Wed Oct 15 08:36:16 2025 +0200

    feat(util): allow Kamelet crossnamespace in Pipes
    
    Closes #6303
---
 pkg/util/bindings/api.go          |   1 +
 pkg/util/bindings/catalog.go      |  10 ++--
 pkg/util/bindings/catalog_test.go | 123 ++++++++++++++++++++------------------
 pkg/util/bindings/kamelet.go      |  19 ++++--
 pkg/util/bindings/kamelet_test.go |  64 ++++++++++++++++++++
 pkg/util/bindings/strimzi.go      |  28 ++++++---
 pkg/util/source/kamelet.go        |  35 ++++++-----
 7 files changed, 189 insertions(+), 91 deletions(-)

diff --git a/pkg/util/bindings/api.go b/pkg/util/bindings/api.go
index 56dadc0a5..47dbe7ff2 100644
--- a/pkg/util/bindings/api.go
+++ b/pkg/util/bindings/api.go
@@ -39,6 +39,7 @@ type Binding struct {
        // Step is to support complex mapping such as Camel's EIPs
        Step map[string]interface{}
        // Traits is a partial trait specification that should be merged into 
the integration
+       // Deprecated: will be removed in future releases
        Traits v1.Traits
        // ApplicationProperties contain properties that should be set on the 
integration for the binding to work
        ApplicationProperties map[string]string
diff --git a/pkg/util/bindings/catalog.go b/pkg/util/bindings/catalog.go
index d05cdb3ae..726216c5e 100644
--- a/pkg/util/bindings/catalog.go
+++ b/pkg/util/bindings/catalog.go
@@ -23,7 +23,6 @@ import (
        "sort"
 
        v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
-       "github.com/apache/camel-k/v2/pkg/platform"
        "k8s.io/utils/ptr"
 )
 
@@ -73,12 +72,13 @@ func validateEndpoint(ctx BindingContext, e v1.Endpoint) 
error {
                return errors.New("cannot use both ref and URI to specify an 
endpoint: only one of them should be used")
        }
        if e.Ref != nil && e.Ref.Namespace != "" && e.Ref.Namespace != 
ctx.Namespace {
-               // referencing default Kamelets in operator namespace is allowed
-               if e.Ref.Kind == v1.KameletKind && e.Ref.Namespace == 
platform.GetOperatorNamespace() {
-                       return nil
+               if ok, err := isKnownKnativeResource(e.Ref); ok {
+                       if err != nil {
+                               return err
+                       }
+                       return errors.New("cross-namespace Pipe references are 
not allowed for Knative")
                }
 
-               return errors.New("cross-namespace references are not allowed 
in Pipe")
        }
        return nil
 }
diff --git a/pkg/util/bindings/catalog_test.go 
b/pkg/util/bindings/catalog_test.go
index d9d235150..53538280d 100644
--- a/pkg/util/bindings/catalog_test.go
+++ b/pkg/util/bindings/catalog_test.go
@@ -96,76 +96,25 @@ func TestValidateEndpoint(t *testing.T) {
                                },
                        },
                },
-       }
-
-       for i, tc := range testcases {
-               t.Run(fmt.Sprintf("test-%d-%s", i, tc.name), func(t *testing.T) 
{
-                       if tc.operatorNamespace != "" {
-                               t.Setenv("NAMESPACE", tc.operatorNamespace)
-                       }
-
-                       ctx, cancel := context.WithCancel(context.Background())
-                       defer cancel()
-
-                       client, err := internal.NewFakeClient()
-                       require.NoError(t, err)
-
-                       bindingContext := BindingContext{
-                               Ctx:       ctx,
-                               Client:    client,
-                               Namespace: tc.namespace,
-                               Profile:   v1.DefaultTraitProfile,
-                       }
-
-                       err = validateEndpoint(bindingContext, tc.endpoint)
-                       require.NoError(t, err)
-               })
-       }
-}
-
-func TestValidateEndpointError(t *testing.T) {
-       uri := "log:info"
-
-       testcases := []struct {
-               name              string
-               namespace         string
-               operatorNamespace string
-               endpoint          v1.Endpoint
-       }{
                {
-                       name:      "kamelet-ref-and-uri",
+                       name:      "it-ref",
                        namespace: "test",
                        endpoint: v1.Endpoint{
-                               URI: &uri,
                                Ref: &corev1.ObjectReference{
-                                       Kind:       v1.KameletKind,
+                                       Kind:       v1.IntegrationKind,
                                        APIVersion: 
v1.SchemeGroupVersion.String(),
-                                       Name:       "foo-kamelet",
+                                       Name:       "foo-it",
                                },
                        },
                },
                {
-                       name:      "kamelet-ref-cross-namespace",
+                       name:      "pipe-ref",
                        namespace: "test",
                        endpoint: v1.Endpoint{
                                Ref: &corev1.ObjectReference{
-                                       Kind:       v1.KameletKind,
-                                       Namespace:  "other",
+                                       Kind:       v1.PipeKind,
                                        APIVersion: 
v1.SchemeGroupVersion.String(),
-                                       Name:       "foo-kamelet",
-                               },
-                       },
-               },
-               {
-                       name:              
"knative-broker-ref-in-operator-namespace",
-                       namespace:         "test",
-                       operatorNamespace: "global",
-                       endpoint: v1.Endpoint{
-                               Ref: &corev1.ObjectReference{
-                                       Kind:       "Broker",
-                                       Namespace:  "global",
-                                       APIVersion: 
eventing.SchemeGroupVersion.String(),
-                                       Name:       "foo-broker",
+                                       Name:       "foo-pipe",
                                },
                        },
                },
@@ -191,7 +140,65 @@ func TestValidateEndpointError(t *testing.T) {
                        }
 
                        err = validateEndpoint(bindingContext, tc.endpoint)
-                       require.Error(t, err, "cross-namespace references are 
not allowed in Pipe")
+                       require.NoError(t, err)
                })
        }
 }
+
+func TestValidateEndpointErrorRefURI(t *testing.T) {
+       uri := "log:info"
+
+       endpoint := v1.Endpoint{
+               URI: &uri,
+               Ref: &corev1.ObjectReference{
+                       Kind:       v1.KameletKind,
+                       APIVersion: v1.SchemeGroupVersion.String(),
+                       Name:       "foo-kamelet",
+               },
+       }
+
+       bindingContext := BindingContext{
+               Namespace: "default",
+       }
+
+       err := validateEndpoint(bindingContext, endpoint)
+       require.Error(t, err)
+       require.Equal(t, "cannot use both ref and URI to specify an endpoint: 
only one of them should be used", err.Error())
+}
+
+func TestValidateEndpointKameletCrossNS(t *testing.T) {
+       endpoint := v1.Endpoint{
+               Ref: &corev1.ObjectReference{
+                       Kind:       v1.KameletKind,
+                       APIVersion: v1.SchemeGroupVersion.String(),
+                       Name:       "foo-kamelet",
+                       Namespace:  "kamelet-ns",
+               },
+       }
+
+       bindingContext := BindingContext{
+               Namespace: "default",
+       }
+
+       err := validateEndpoint(bindingContext, endpoint)
+       require.NoError(t, err)
+}
+
+func TestValidateEndpointErrorKnativeCrossNS(t *testing.T) {
+       endpoint := v1.Endpoint{
+               Ref: &corev1.ObjectReference{
+                       Kind:       "Broker",
+                       Namespace:  "knative-ns",
+                       APIVersion: eventing.SchemeGroupVersion.String(),
+                       Name:       "foo-broker",
+               },
+       }
+
+       bindingContext := BindingContext{
+               Namespace: "default",
+       }
+
+       err := validateEndpoint(bindingContext, endpoint)
+       require.Error(t, err)
+       require.Equal(t, "cross-namespace Pipe references are not allowed for 
Knative", err.Error())
+}
diff --git a/pkg/util/bindings/kamelet.go b/pkg/util/bindings/kamelet.go
index 1c6bad97b..f96421612 100644
--- a/pkg/util/bindings/kamelet.go
+++ b/pkg/util/bindings/kamelet.go
@@ -23,6 +23,7 @@ import (
        "strings"
 
        v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+       "github.com/apache/camel-k/v2/pkg/util/source"
 
        "k8s.io/apimachinery/pkg/runtime/schema"
 )
@@ -62,18 +63,26 @@ func (k BindingConverter) Translate(ctx BindingContext, 
endpointCtx EndpointCont
                return nil, err
        }
 
+       // Set id, if specified
        id, idPresent := props[v1.KameletIDProperty]
        if idPresent {
                delete(props, v1.KameletIDProperty)
        } else {
                id = endpointCtx.GenerateID()
        }
+       // Set version, if specified
        version, versionPresent := props[v1.KameletVersionProperty]
        if versionPresent {
                delete(props, v1.KameletVersionProperty)
        }
 
-       kameletTranslated := getKameletName(kameletName, id, version)
+       // Set namespace, if specified and different from the actual context
+       namespace := ""
+       if e.Ref.Namespace != "" && e.Ref.Namespace != ctx.Namespace {
+               namespace = e.Ref.Namespace
+       }
+
+       kameletTranslated := getKameletName(kameletName, id, version, namespace)
 
        binding := Binding{}
        binding.ApplicationProperties = make(map[string]string)
@@ -146,12 +155,10 @@ func (k BindingConverter) Translate(ctx BindingContext, 
endpointCtx EndpointCont
        return &binding, nil
 }
 
-func getKameletName(name, id, version string) string {
+// getKameletName returns the kamelet with it's name and querystring attached.
+func getKameletName(name, id, version, namespace string) string {
        kamelet := fmt.Sprintf("%s/%s", name, url.PathEscape(id))
-       if version != "" {
-               kamelet = fmt.Sprintf("%s?%s=%s", kamelet, 
v1.KameletVersionProperty, version)
-       }
-       return kamelet
+       return source.GetKameletQuerystring(kamelet, version, namespace)
 }
 
 // DataTypeStep --.
diff --git a/pkg/util/bindings/kamelet_test.go 
b/pkg/util/bindings/kamelet_test.go
index dcd8bf7cb..6a6faa5e5 100644
--- a/pkg/util/bindings/kamelet_test.go
+++ b/pkg/util/bindings/kamelet_test.go
@@ -429,3 +429,67 @@ func getExpectedStep(withIn bool, withOut bool, 
dataTypeActionKamelet string) ma
                },
        }
 }
+
+func TestBindingCrossNamespacedKamelets(t *testing.T) {
+       client, err := internal.NewFakeClient()
+       require.NoError(t, err)
+
+       endpoint := v1.Endpoint{
+               Ref: &corev1.ObjectReference{
+                       Kind:       "Kamelet",
+                       APIVersion: "camel.apache.org/v1any1",
+                       Name:       "mykamelet",
+                       Namespace:  "extra",
+               },
+       }
+
+       binding, err := BindingConverter{}.Translate(
+               BindingContext{
+                       Ctx:       context.Background(),
+                       Client:    client,
+                       Namespace: "test",
+                       Profile:   v1.TraitProfileKubernetes,
+               },
+               EndpointContext{
+                       Type: v1.EndpointTypeSource,
+               },
+               endpoint)
+
+       require.NoError(t, err)
+       assert.NotNil(t, binding)
+       assert.Equal(t, "kamelet:mykamelet/source?kameletNamespace=extra", 
binding.URI)
+}
+
+func TestBindingCrossNamespacedAndVersionedKamelets(t *testing.T) {
+       client, err := internal.NewFakeClient()
+       require.NoError(t, err)
+
+       endpoint := v1.Endpoint{
+               Ref: &corev1.ObjectReference{
+                       Kind:       "Kamelet",
+                       APIVersion: "camel.apache.org/v1any1",
+                       Name:       "mykamelet",
+                       Namespace:  "extra",
+               },
+               Properties: asEndpointProperties(
+                       map[string]string{"kameletVersion": "v1"},
+               ),
+       }
+
+       binding, err := BindingConverter{}.Translate(
+               BindingContext{
+                       Ctx:       context.Background(),
+                       Client:    client,
+                       Namespace: "test",
+                       Profile:   v1.TraitProfileKubernetes,
+               },
+               EndpointContext{
+                       Type: v1.EndpointTypeSource,
+               },
+               endpoint)
+
+       require.NoError(t, err)
+       assert.NotNil(t, binding)
+       assert.Equal(t, 
"kamelet:mykamelet/source?kameletVersion=v1&kameletNamespace=extra", 
binding.URI)
+       assert.Empty(t, binding.ApplicationProperties)
+}
diff --git a/pkg/util/bindings/strimzi.go b/pkg/util/bindings/strimzi.go
index aac175ab0..5f76d3bfc 100644
--- a/pkg/util/bindings/strimzi.go
+++ b/pkg/util/bindings/strimzi.go
@@ -97,6 +97,7 @@ func (s StrimziBindingProvider) fromKafkaToCamel(ctx 
BindingContext, endpoint ca
        }
        topicName := props["topic"]
        delete(props, "topic")
+       //nolint:nestif
        if props["brokers"] == "" {
                // build the client if needed
                if s.Client == nil {
@@ -106,7 +107,12 @@ func (s StrimziBindingProvider) fromKafkaToCamel(ctx 
BindingContext, endpoint ca
                        }
                        s.Client = kafkaClient
                }
-               bootstrapServers, err := s.getBootstrapServers(ctx, 
endpoint.Ref.Name)
+               namespace := endpoint.Ref.Namespace
+               if namespace == "" {
+                       namespace = ctx.Namespace
+               }
+
+               bootstrapServers, err := s.getBootstrapServers(ctx, 
endpoint.Ref.Name, namespace)
                if err != nil {
                        return nil, err
                }
@@ -162,7 +168,11 @@ func (s StrimziBindingProvider) lookupBootstrapServers(ctx 
BindingContext, endpo
        if clusterName == "" {
                return "", fmt.Errorf("no %q label defined on topic %s", 
v1beta2.StrimziKafkaClusterLabel, endpoint.Ref.Name)
        }
-       bootstrapServers, err := s.getBootstrapServers(ctx, clusterName)
+       namespace := endpoint.Ref.Namespace
+       if namespace == "" {
+               namespace = ctx.Namespace
+       }
+       bootstrapServers, err := s.getBootstrapServers(ctx, clusterName, 
namespace)
        if err != nil {
                return "", err
        }
@@ -170,8 +180,8 @@ func (s StrimziBindingProvider) lookupBootstrapServers(ctx 
BindingContext, endpo
        return bootstrapServers, nil
 }
 
-func (s StrimziBindingProvider) getBootstrapServers(ctx BindingContext, 
clusterName string) (string, error) {
-       cluster, err := 
s.Client.KafkaV1beta2().Kafkas(ctx.Namespace).Get(ctx.Ctx, clusterName, 
v1.GetOptions{})
+func (s StrimziBindingProvider) getBootstrapServers(ctx BindingContext, 
clusterName, namespace string) (string, error) {
+       cluster, err := s.Client.KafkaV1beta2().Kafkas(namespace).Get(ctx.Ctx, 
clusterName, v1.GetOptions{})
        if err != nil {
                return "", err
        }
@@ -190,8 +200,12 @@ func (s StrimziBindingProvider) getBootstrapServers(ctx 
BindingContext, clusterN
 }
 
 func (s StrimziBindingProvider) lookupTopic(ctx BindingContext, endpoint 
camelv1.Endpoint) (*v1beta2.KafkaTopic, error) {
+       namespace := endpoint.Ref.Namespace
+       if namespace == "" {
+               namespace = ctx.Namespace
+       }
        // first check by KafkaTopic name
-       topic, err := 
s.Client.KafkaV1beta2().KafkaTopics(ctx.Namespace).Get(ctx.Ctx, 
endpoint.Ref.Name, v1.GetOptions{})
+       topic, err := 
s.Client.KafkaV1beta2().KafkaTopics(namespace).Get(ctx.Ctx, endpoint.Ref.Name, 
v1.GetOptions{})
        if err != nil && !k8serrors.IsNotFound(err) {
                return nil, err
        }
@@ -200,12 +214,12 @@ func (s StrimziBindingProvider) lookupTopic(ctx 
BindingContext, endpoint camelv1
        }
 
        // if not found, then, look at the .status.topicName (it may be 
autogenerated)
-       topics, err := 
s.Client.KafkaV1beta2().KafkaTopics(ctx.Namespace).List(ctx.Ctx, v1.ListOptions{
+       topics, err := 
s.Client.KafkaV1beta2().KafkaTopics(namespace).List(ctx.Ctx, v1.ListOptions{
                FieldSelector: "status.topicName=" + endpoint.Ref.Name,
        })
 
        if err != nil {
-               return nil, err
+               return nil, fmt.Errorf("couldn't find any KafkaTopic with 
either name or topicName %s; error %w", endpoint.Ref.Name, err)
        }
        if len(topics.Items) == 0 {
                return nil, fmt.Errorf("couldn't find any KafkaTopic with 
either name or topicName %s", endpoint.Ref.Name)
diff --git a/pkg/util/source/kamelet.go b/pkg/util/source/kamelet.go
index 82579aead..d5b3fcfcb 100644
--- a/pkg/util/source/kamelet.go
+++ b/pkg/util/source/kamelet.go
@@ -27,26 +27,12 @@ import (
 
 var kameletNameRegexp = 
regexp.MustCompile("kamelet:(?://)?([a-z0-9-.]+(/[a-z0-9-.]+)?)(?:$|[^a-z0-9-.].*)")
 
-//nolint:nestif
 func ExtractKamelet(uri string) string {
        matches := kameletNameRegexp.FindStringSubmatch(uri)
        if len(matches) > 1 {
                version := getKameletParam(uri, v1.KameletVersionProperty)
                namespace := getKameletParam(uri, v1.KameletNamespaceProperty)
-               if version != "" || namespace != "" {
-                       var querystring string
-                       if version != "" {
-                               querystring = v1.KameletVersionProperty + "=" + 
version
-                       }
-                       if namespace != "" {
-                               if querystring != "" {
-                                       querystring += "&"
-                               }
-                               querystring += v1.KameletNamespaceProperty + 
"=" + namespace
-                       }
-                       return fmt.Sprintf("%s?%s", matches[1], querystring)
-               }
-               return matches[1]
+               return GetKameletQuerystring(matches[1], version, namespace)
        }
        return ""
 }
@@ -67,3 +53,22 @@ func getKameletParam(uri, param string) string {
        queryParams := parsedURL.Query()
        return queryParams.Get(param)
 }
+
+// GetKameletQuerystring returns a kamelet name appended with its version and 
namespace (if provided).
+func GetKameletQuerystring(name, version, namespace string) string {
+       if version != "" || namespace != "" {
+               var querystring string
+               if version != "" {
+                       querystring = v1.KameletVersionProperty + "=" + version
+               }
+               if namespace != "" {
+                       if querystring != "" {
+                               querystring += "&"
+                       }
+                       querystring += v1.KameletNamespaceProperty + "=" + 
namespace
+               }
+               return fmt.Sprintf("%s?%s", name, querystring)
+       }
+
+       return name
+}

Reply via email to