This is an automated email from the ASF dual-hosted git repository.
pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/main by this push:
new 8d71b13d7 feat(strimzi): bind to either KafkaTopic name or topicName
8d71b13d7 is described below
commit 8d71b13d727658ff6390d9c60b92644ceb5dc0ef
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Mon Mar 25 15:10:39 2024 +0100
feat(strimzi): bind to either KafkaTopic name or topicName
Closes #4759
---
addons/strimzi/duck/v1beta2/duck_types.go | 6 +++
.../strimzi/duck/v1beta2/zz_generated.deepcopy.go | 16 ++++++
addons/strimzi/strimzi.go | 18 ++++++-
addons/strimzi/strimzi_test.go | 63 ++++++++++++++++++++++
helm/camel-k/templates/operator-cluster-roles.yaml | 1 +
helm/camel-k/templates/operator-role.yaml | 1 +
.../descoped/operator-cluster-role-strimzi.yaml | 1 +
.../rbac/namespaced/operator-role-strimzi.yaml | 1 +
8 files changed, 105 insertions(+), 2 deletions(-)
diff --git a/addons/strimzi/duck/v1beta2/duck_types.go
b/addons/strimzi/duck/v1beta2/duck_types.go
index 9caee47c6..936c76b33 100644
--- a/addons/strimzi/duck/v1beta2/duck_types.go
+++ b/addons/strimzi/duck/v1beta2/duck_types.go
@@ -40,6 +40,12 @@ const (
type KafkaTopic struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
+ Status KafkaTopicStatus `json:"status,omitempty"`
+}
+
+// KafkaTopicStatus is the duck of a KafkaTopic status.
+type KafkaTopicStatus struct {
+ TopicName string `json:"topicName,omitempty"`
}
// +kubebuilder:object:root=true
diff --git a/addons/strimzi/duck/v1beta2/zz_generated.deepcopy.go
b/addons/strimzi/duck/v1beta2/zz_generated.deepcopy.go
index 3a0464935..f12a483fd 100644
--- a/addons/strimzi/duck/v1beta2/zz_generated.deepcopy.go
+++ b/addons/strimzi/duck/v1beta2/zz_generated.deepcopy.go
@@ -107,6 +107,7 @@ func (in *KafkaTopic) DeepCopyInto(out *KafkaTopic) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
+ out.Status = in.Status
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new KafkaTopic.
@@ -158,3 +159,18 @@ func (in *KafkaTopicList) DeepCopyObject() runtime.Object {
}
return nil
}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
+func (in *KafkaTopicStatus) DeepCopyInto(out *KafkaTopicStatus) {
+ *out = *in
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new KafkaTopicStatus.
+func (in *KafkaTopicStatus) DeepCopy() *KafkaTopicStatus {
+ if in == nil {
+ return nil
+ }
+ out := new(KafkaTopicStatus)
+ in.DeepCopyInto(out)
+ return out
+}
diff --git a/addons/strimzi/strimzi.go b/addons/strimzi/strimzi.go
index e7558da80..fdd17fcd5 100644
--- a/addons/strimzi/strimzi.go
+++ b/addons/strimzi/strimzi.go
@@ -23,11 +23,11 @@ import (
"github.com/apache/camel-k/v2/addons/strimzi/duck/client/internalclientset"
"github.com/apache/camel-k/v2/addons/strimzi/duck/v1beta2"
-
camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
camelv1alpha1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/v2/pkg/util/bindings"
"github.com/apache/camel-k/v2/pkg/util/uri"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
@@ -77,7 +77,21 @@ func (s BindingProvider) Translate(ctx
bindings.BindingContext, _ bindings.Endpo
// look them up
topic, err :=
s.Client.KafkaV1beta2().KafkaTopics(ctx.Namespace).Get(ctx.Ctx,
endpoint.Ref.Name, v1.GetOptions{})
if err != nil {
- return nil, err
+ if k8serrors.IsNotFound(err) {
+ topicList, err :=
s.Client.KafkaV1beta2().KafkaTopics(ctx.Namespace).List(ctx.Ctx, v1.ListOptions{
+ FieldSelector: "status.topicName=" +
endpoint.Ref.Name,
+ })
+ if err != nil {
+ return nil, err
+ }
+ if len(topicList.Items) == 0 {
+ return nil, fmt.Errorf("couldn't find
any KafkaTopic with either name or topicName %s", endpoint.Ref.Name)
+ }
+ // Just return the first item
+ topic = &topicList.Items[0]
+ } else {
+ return nil, err
+ }
}
clusterName := topic.Labels[v1beta2.StrimziKafkaClusterLabel]
diff --git a/addons/strimzi/strimzi_test.go b/addons/strimzi/strimzi_test.go
index 92d7a061b..967cf83c4 100644
--- a/addons/strimzi/strimzi_test.go
+++ b/addons/strimzi/strimzi_test.go
@@ -137,3 +137,66 @@ func asEndpointProperties(props map[string]string)
*camelv1.EndpointProperties {
RawMessage: serialized,
}
}
+
+func TestStrimziLookupByTopicName(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ cluster := v1beta2.Kafka{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test",
+ Name: "myclusterx",
+ },
+ Status: v1beta2.KafkaStatus{
+ Listeners: []v1beta2.KafkaStatusListener{
+ {
+ Type: "tls",
+ },
+ {
+ BootstrapServers:
"my-clusterx-kafka-bootstrap:9092",
+ Type: "plain",
+ },
+ },
+ },
+ }
+
+ topic := v1beta2.KafkaTopic{
+ ObjectMeta: metav1.ObjectMeta{
+ Namespace: "test",
+ Name: "mytopicy",
+ Labels: map[string]string{
+ v1beta2.StrimziKafkaClusterLabel: "myclusterx",
+ },
+ },
+ Status: v1beta2.KafkaTopicStatus{
+ TopicName: "my-topic-name",
+ },
+ }
+
+ client := fake.NewSimpleClientset(&cluster, &topic)
+ provider := BindingProvider{
+ Client: client,
+ }
+
+ bindingContext := bindings.BindingContext{
+ Ctx: ctx,
+ Namespace: "test",
+ Profile: camelv1.TraitProfileKubernetes,
+ }
+
+ endpoint := camelv1.Endpoint{
+ Ref: &v1.ObjectReference{
+ Kind: "KafkaTopic",
+ Name: "my-topic-name",
+ APIVersion: "kafka.strimzi.io/v1beta2",
+ },
+ }
+
+ binding, err := provider.Translate(bindingContext,
bindings.EndpointContext{
+ Type: camelv1.EndpointTypeSink,
+ }, endpoint)
+ require.NoError(t, err)
+ assert.NotNil(t, binding)
+ assert.Equal(t,
"kafka:my-topic-name?brokers=my-clusterx-kafka-bootstrap%3A9092", binding.URI)
+ assert.Equal(t, camelv1.Traits{}, binding.Traits)
+}
diff --git a/helm/camel-k/templates/operator-cluster-roles.yaml
b/helm/camel-k/templates/operator-cluster-roles.yaml
index f4d970409..1fd76bf40 100644
--- a/helm/camel-k/templates/operator-cluster-roles.yaml
+++ b/helm/camel-k/templates/operator-cluster-roles.yaml
@@ -425,6 +425,7 @@ rules:
- "kafka.strimzi.io"
resources:
- kafkatopics
+ - kafkatopics/status
- kafkas
verbs:
- get
diff --git a/helm/camel-k/templates/operator-role.yaml
b/helm/camel-k/templates/operator-role.yaml
index d035280f8..ff1d804a1 100644
--- a/helm/camel-k/templates/operator-role.yaml
+++ b/helm/camel-k/templates/operator-role.yaml
@@ -366,6 +366,7 @@ rules:
- kafka.strimzi.io
resources:
- kafkatopics
+ - kafkatopics/status
- kafkas
verbs:
- get
diff --git
a/pkg/resources/config/rbac/descoped/operator-cluster-role-strimzi.yaml
b/pkg/resources/config/rbac/descoped/operator-cluster-role-strimzi.yaml
index 9ccea139f..e27454bcf 100644
--- a/pkg/resources/config/rbac/descoped/operator-cluster-role-strimzi.yaml
+++ b/pkg/resources/config/rbac/descoped/operator-cluster-role-strimzi.yaml
@@ -26,6 +26,7 @@ rules:
- "kafka.strimzi.io"
resources:
- kafkatopics
+ - kafkatopics/status
- kafkas
verbs:
- get
diff --git a/pkg/resources/config/rbac/namespaced/operator-role-strimzi.yaml
b/pkg/resources/config/rbac/namespaced/operator-role-strimzi.yaml
index ab0a91d70..0802191c3 100644
--- a/pkg/resources/config/rbac/namespaced/operator-role-strimzi.yaml
+++ b/pkg/resources/config/rbac/namespaced/operator-role-strimzi.yaml
@@ -26,6 +26,7 @@ rules:
- "kafka.strimzi.io"
resources:
- kafkatopics
+ - kafkatopics/status
- kafkas
verbs:
- get