This is an automated email from the ASF dual-hosted git repository.

pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d71b13d7 feat(strimzi): bind to either KafkaTopic name or topicName
8d71b13d7 is described below

commit 8d71b13d727658ff6390d9c60b92644ceb5dc0ef
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Mon Mar 25 15:10:39 2024 +0100

    feat(strimzi): bind to either KafkaTopic name or topicName
    
    Closes #4759
---
 addons/strimzi/duck/v1beta2/duck_types.go          |  6 +++
 .../strimzi/duck/v1beta2/zz_generated.deepcopy.go  | 16 ++++++
 addons/strimzi/strimzi.go                          | 18 ++++++-
 addons/strimzi/strimzi_test.go                     | 63 ++++++++++++++++++++++
 helm/camel-k/templates/operator-cluster-roles.yaml |  1 +
 helm/camel-k/templates/operator-role.yaml          |  1 +
 .../descoped/operator-cluster-role-strimzi.yaml    |  1 +
 .../rbac/namespaced/operator-role-strimzi.yaml     |  1 +
 8 files changed, 105 insertions(+), 2 deletions(-)

diff --git a/addons/strimzi/duck/v1beta2/duck_types.go 
b/addons/strimzi/duck/v1beta2/duck_types.go
index 9caee47c6..936c76b33 100644
--- a/addons/strimzi/duck/v1beta2/duck_types.go
+++ b/addons/strimzi/duck/v1beta2/duck_types.go
@@ -40,6 +40,12 @@ const (
 type KafkaTopic struct {
        metav1.TypeMeta   `json:",inline"`
        metav1.ObjectMeta `json:"metadata,omitempty"`
+       Status            KafkaTopicStatus `json:"status,omitempty"`
+}
+
+// KafkaTopicStatus is the duck of a KafkaTopic status.
+type KafkaTopicStatus struct {
+       TopicName string `json:"topicName,omitempty"`
 }
 
 // +kubebuilder:object:root=true
diff --git a/addons/strimzi/duck/v1beta2/zz_generated.deepcopy.go 
b/addons/strimzi/duck/v1beta2/zz_generated.deepcopy.go
index 3a0464935..f12a483fd 100644
--- a/addons/strimzi/duck/v1beta2/zz_generated.deepcopy.go
+++ b/addons/strimzi/duck/v1beta2/zz_generated.deepcopy.go
@@ -107,6 +107,7 @@ func (in *KafkaTopic) DeepCopyInto(out *KafkaTopic) {
        *out = *in
        out.TypeMeta = in.TypeMeta
        in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
+       out.Status = in.Status
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new KafkaTopic.
@@ -158,3 +159,18 @@ func (in *KafkaTopicList) DeepCopyObject() runtime.Object {
        }
        return nil
 }
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
+func (in *KafkaTopicStatus) DeepCopyInto(out *KafkaTopicStatus) {
+       *out = *in
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new KafkaTopicStatus.
+func (in *KafkaTopicStatus) DeepCopy() *KafkaTopicStatus {
+       if in == nil {
+               return nil
+       }
+       out := new(KafkaTopicStatus)
+       in.DeepCopyInto(out)
+       return out
+}
diff --git a/addons/strimzi/strimzi.go b/addons/strimzi/strimzi.go
index e7558da80..fdd17fcd5 100644
--- a/addons/strimzi/strimzi.go
+++ b/addons/strimzi/strimzi.go
@@ -23,11 +23,11 @@ import (
 
        
"github.com/apache/camel-k/v2/addons/strimzi/duck/client/internalclientset"
        "github.com/apache/camel-k/v2/addons/strimzi/duck/v1beta2"
-
        camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
        camelv1alpha1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
        "github.com/apache/camel-k/v2/pkg/util/bindings"
        "github.com/apache/camel-k/v2/pkg/util/uri"
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
        v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime/schema"
 )
@@ -77,7 +77,21 @@ func (s BindingProvider) Translate(ctx 
bindings.BindingContext, _ bindings.Endpo
                // look them up
                topic, err := 
s.Client.KafkaV1beta2().KafkaTopics(ctx.Namespace).Get(ctx.Ctx, 
endpoint.Ref.Name, v1.GetOptions{})
                if err != nil {
-                       return nil, err
+                       if k8serrors.IsNotFound(err) {
+                               topicList, err := 
s.Client.KafkaV1beta2().KafkaTopics(ctx.Namespace).List(ctx.Ctx, v1.ListOptions{
+                                       FieldSelector: "status.topicName=" + 
endpoint.Ref.Name,
+                               })
+                               if err != nil {
+                                       return nil, err
+                               }
+                               if len(topicList.Items) == 0 {
+                                       return nil, fmt.Errorf("couldn't find 
any KafkaTopic with either name or topicName %s", endpoint.Ref.Name)
+                               }
+                               // Just return the first item
+                               topic = &topicList.Items[0]
+                       } else {
+                               return nil, err
+                       }
                }
 
                clusterName := topic.Labels[v1beta2.StrimziKafkaClusterLabel]
diff --git a/addons/strimzi/strimzi_test.go b/addons/strimzi/strimzi_test.go
index 92d7a061b..967cf83c4 100644
--- a/addons/strimzi/strimzi_test.go
+++ b/addons/strimzi/strimzi_test.go
@@ -137,3 +137,66 @@ func asEndpointProperties(props map[string]string) 
*camelv1.EndpointProperties {
                RawMessage: serialized,
        }
 }
+
+func TestStrimziLookupByTopicName(t *testing.T) {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       cluster := v1beta2.Kafka{
+               ObjectMeta: metav1.ObjectMeta{
+                       Namespace: "test",
+                       Name:      "myclusterx",
+               },
+               Status: v1beta2.KafkaStatus{
+                       Listeners: []v1beta2.KafkaStatusListener{
+                               {
+                                       Type: "tls",
+                               },
+                               {
+                                       BootstrapServers: 
"my-clusterx-kafka-bootstrap:9092",
+                                       Type:             "plain",
+                               },
+                       },
+               },
+       }
+
+       topic := v1beta2.KafkaTopic{
+               ObjectMeta: metav1.ObjectMeta{
+                       Namespace: "test",
+                       Name:      "mytopicy",
+                       Labels: map[string]string{
+                               v1beta2.StrimziKafkaClusterLabel: "myclusterx",
+                       },
+               },
+               Status: v1beta2.KafkaTopicStatus{
+                       TopicName: "my-topic-name",
+               },
+       }
+
+       client := fake.NewSimpleClientset(&cluster, &topic)
+       provider := BindingProvider{
+               Client: client,
+       }
+
+       bindingContext := bindings.BindingContext{
+               Ctx:       ctx,
+               Namespace: "test",
+               Profile:   camelv1.TraitProfileKubernetes,
+       }
+
+       endpoint := camelv1.Endpoint{
+               Ref: &v1.ObjectReference{
+                       Kind:       "KafkaTopic",
+                       Name:       "my-topic-name",
+                       APIVersion: "kafka.strimzi.io/v1beta2",
+               },
+       }
+
+       binding, err := provider.Translate(bindingContext, 
bindings.EndpointContext{
+               Type: camelv1.EndpointTypeSink,
+       }, endpoint)
+       require.NoError(t, err)
+       assert.NotNil(t, binding)
+       assert.Equal(t, 
"kafka:my-topic-name?brokers=my-clusterx-kafka-bootstrap%3A9092", binding.URI)
+       assert.Equal(t, camelv1.Traits{}, binding.Traits)
+}
diff --git a/helm/camel-k/templates/operator-cluster-roles.yaml 
b/helm/camel-k/templates/operator-cluster-roles.yaml
index f4d970409..1fd76bf40 100644
--- a/helm/camel-k/templates/operator-cluster-roles.yaml
+++ b/helm/camel-k/templates/operator-cluster-roles.yaml
@@ -425,6 +425,7 @@ rules:
   - "kafka.strimzi.io"
   resources:
   - kafkatopics
+  - kafkatopics/status
   - kafkas
   verbs:
   - get
diff --git a/helm/camel-k/templates/operator-role.yaml 
b/helm/camel-k/templates/operator-role.yaml
index d035280f8..ff1d804a1 100644
--- a/helm/camel-k/templates/operator-role.yaml
+++ b/helm/camel-k/templates/operator-role.yaml
@@ -366,6 +366,7 @@ rules:
   - kafka.strimzi.io
   resources:
   - kafkatopics
+  - kafkatopics/status
   - kafkas
   verbs:
   - get
diff --git 
a/pkg/resources/config/rbac/descoped/operator-cluster-role-strimzi.yaml 
b/pkg/resources/config/rbac/descoped/operator-cluster-role-strimzi.yaml
index 9ccea139f..e27454bcf 100644
--- a/pkg/resources/config/rbac/descoped/operator-cluster-role-strimzi.yaml
+++ b/pkg/resources/config/rbac/descoped/operator-cluster-role-strimzi.yaml
@@ -26,6 +26,7 @@ rules:
   - "kafka.strimzi.io"
   resources:
   - kafkatopics
+  - kafkatopics/status
   - kafkas
   verbs:
   - get
diff --git a/pkg/resources/config/rbac/namespaced/operator-role-strimzi.yaml 
b/pkg/resources/config/rbac/namespaced/operator-role-strimzi.yaml
index ab0a91d70..0802191c3 100644
--- a/pkg/resources/config/rbac/namespaced/operator-role-strimzi.yaml
+++ b/pkg/resources/config/rbac/namespaced/operator-role-strimzi.yaml
@@ -26,6 +26,7 @@ rules:
   - "kafka.strimzi.io"
   resources:
   - kafkatopics
+  - kafkatopics/status
   - kafkas
   verbs:
   - get

Reply via email to