This is an automated email from the ASF dual-hosted git repository.

wmedvedeo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-tools.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ca727d776c incubator-kie-tools-2774: Make the SonataFlow Operator to 
configure the Jobs related Knative Eventing Triggers to consume the events in 
order (#2775)
0ca727d776c is described below

commit 0ca727d776c3c75f7cdea09479b650fa8ac45f8e
Author: Walter Medvedeo <[email protected]>
AuthorDate: Wed Nov 27 17:03:05 2024 +0100

    incubator-kie-tools-2774: Make the SonataFlow Operator to configure the 
Jobs related Knative Eventing Triggers to consume the events in order (#2775)
---
 .../internal/controller/knative/knative.go         | 44 ++++++++++++-----
 .../controller/platform/services/services.go       | 57 ++++++++++++++--------
 .../controller/profiles/common/object_creators.go  |  2 +-
 .../sonataflowplatform_controller_test.go          |  3 --
 .../sonataflow-operator/test/e2e/platform_test.go  |  2 -
 5 files changed, 69 insertions(+), 39 deletions(-)

diff --git 
a/packages/sonataflow-operator/internal/controller/knative/knative.go 
b/packages/sonataflow-operator/internal/controller/knative/knative.go
index d215a66398f..49f2540b2d4 100644
--- a/packages/sonataflow-operator/internal/controller/knative/knative.go
+++ b/packages/sonataflow-operator/internal/controller/knative/knative.go
@@ -22,11 +22,13 @@ package knative
 import (
        "context"
        "fmt"
+       "strings"
 
        corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/types"
        "k8s.io/client-go/rest"
+       "knative.dev/eventing/pkg/apis/eventing"
        eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
        sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
        clienteventingv1 
"knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
@@ -51,14 +53,16 @@ type Availability struct {
 }
 
 const (
-       kSink                     = "K_SINK"
-       knativeBundleVolume       = "kne-bundle-volume"
-       kCeOverRides              = "K_CE_OVERRIDES"
-       knativeServingGroup       = "serving.knative.dev"
-       knativeEventingGroup      = "eventing.knative.dev"
-       knativeEventingAPIVersion = "eventing.knative.dev/v1"
-       knativeBrokerKind         = "Broker"
-       knativeSinkProvided       = "SinkProvided"
+       kSink                                    = "K_SINK"
+       knativeBundleVolume                      = "kne-bundle-volume"
+       kCeOverRides                             = "K_CE_OVERRIDES"
+       knativeServingGroup                      = "serving.knative.dev"
+       knativeEventingGroup                     = "eventing.knative.dev"
+       knativeEventingAPIVersion                = "eventing.knative.dev/v1"
+       knativeBrokerKind                        = "Broker"
+       knativeSinkProvided                      = "SinkProvided"
+       KafkaKnativeEventingDeliveryOrder        = 
"kafka.eventing.knative.dev/delivery.order"
+       KafkaKnativeEventingDeliveryOrderOrdered = "ordered"
 )
 
 func GetKnativeServingClient(cfg *rest.Config) 
(clientservingv1.ServingV1Interface, error) {
@@ -132,19 +136,33 @@ func getDestinationWithNamespace(dest 
*duckv1.Destination, namespace string) *du
        return dest
 }
 
-func ValidateBroker(name, namespace string) error {
+func ValidateBroker(name, namespace string) (*eventingv1.Broker, error) {
        broker := &eventingv1.Broker{}
        if err := utils.GetClient().Get(context.TODO(), 
types.NamespacedName{Name: name, Namespace: namespace}, broker); err != nil {
                if errors.IsNotFound(err) {
-                       return fmt.Errorf("broker %s in namespace %s does not 
exist", name, namespace)
+                       return nil, fmt.Errorf("broker %s in namespace %s does 
not exist", name, namespace)
                }
-               return err
+               return nil, err
        }
        cond := broker.Status.GetCondition(apis.ConditionReady)
        if cond != nil && cond.Status == corev1.ConditionTrue {
-               return nil
+               return broker, nil
+       }
+       return nil, fmt.Errorf("broker %s in namespace %s is not ready", name, 
namespace)
+}
+
+// GetBrokerClass returns the broker class for a Knative Eventing Broker.
+func GetBrokerClass(broker *eventingv1.Broker) string {
+       if broker.Annotations == nil {
+               return ""
        }
-       return fmt.Errorf("broker %s in namespace %s is not ready", name, 
namespace)
+       return broker.Annotations[eventing.BrokerClassKey]
+}
+
+// IsKafkaBroker returns true if the class for a Knative Eventing Broker 
corresponds to a Kafka broker.
+func IsKafkaBroker(brokerClass string) bool {
+       // currently available kafka broker classes are "Kafka", and 
"KafkaNamespaced", for safety ask for the substring "Kafka".
+       return strings.Contains(brokerClass, "Kafka")
 }
 
 func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl 
*operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) {
diff --git 
a/packages/sonataflow-operator/internal/controller/platform/services/services.go
 
b/packages/sonataflow-operator/internal/controller/platform/services/services.go
index 97114d3e777..03427c548f6 100644
--- 
a/packages/sonataflow-operator/internal/controller/platform/services/services.go
+++ 
b/packages/sonataflow-operator/internal/controller/platform/services/services.go
@@ -575,12 +575,13 @@ func (d *DataIndexHandler) GetSourceBroker() 
*duckv1.Destination {
        return GetPlatformBroker(d.platform)
 }
 
-func (d *DataIndexHandler) newTrigger(labels map[string]string, brokerName, 
namespace, serviceName, tag, eventType, path string, platform 
*operatorapi.SonataFlowPlatform) *eventingv1.Trigger {
+func (d *DataIndexHandler) newTrigger(labels map[string]string, annotations 
map[string]string, brokerName, namespace, serviceName, tag, eventType, path 
string, platform *operatorapi.SonataFlowPlatform) *eventingv1.Trigger {
        return &eventingv1.Trigger{
                ObjectMeta: metav1.ObjectMeta{
-                       Name:      
kmeta.ChildName(fmt.Sprintf("data-index-%s-", tag), string(platform.GetUID())),
-                       Namespace: namespace,
-                       Labels:    labels,
+                       Name:        
kmeta.ChildName(fmt.Sprintf("data-index-%s-", tag), string(platform.GetUID())),
+                       Namespace:   namespace,
+                       Labels:      labels,
+                       Annotations: annotations,
                },
                Spec: eventingv1.TriggerSpec{
                        Broker: brokerName,
@@ -613,7 +614,9 @@ func (d *DataIndexHandler) 
GenerateKnativeResources(platform *operatorapi.Sonata
        if len(namespace) == 0 {
                namespace = platform.Namespace
        }
-       if err := knative.ValidateBroker(brokerName, namespace); err != nil {
+       var brokerObject *eventingv1.Broker
+       var err error
+       if brokerObject, err = knative.ValidateBroker(brokerName, namespace); 
err != nil {
                event := &corev1.Event{
                        Type:    corev1.EventTypeWarning,
                        Reason:  WaitingKnativeEventing,
@@ -621,16 +624,18 @@ func (d *DataIndexHandler) 
GenerateKnativeResources(platform *operatorapi.Sonata
                }
                return nil, event, err
        }
+       annotations := make(map[string]string)
+       managedAnnotations := make(map[string]string)
+       addTriggerAnnotations(knative.GetBrokerClass(brokerObject), 
managedAnnotations)
        serviceName := d.GetServiceName()
        return []client.Object{
-               d.newTrigger(lbl, brokerName, namespace, serviceName, 
"process-error", "ProcessInstanceErrorDataEvent", 
constants.KogitoProcessInstancesEventsPath, platform),
-               d.newTrigger(lbl, brokerName, namespace, serviceName, 
"process-node", "ProcessInstanceNodeDataEvent", 
constants.KogitoProcessInstancesEventsPath, platform),
-               d.newTrigger(lbl, brokerName, namespace, serviceName, 
"process-sla", "ProcessInstanceSLADataEvent", 
constants.KogitoProcessInstancesEventsPath, platform),
-               d.newTrigger(lbl, brokerName, namespace, serviceName, 
"process-state", "ProcessInstanceStateDataEvent", 
constants.KogitoProcessInstancesEventsPath, platform),
-               d.newTrigger(lbl, brokerName, namespace, serviceName, 
"process-variable", "ProcessInstanceVariableDataEvent", 
constants.KogitoProcessInstancesEventsPath, platform),
-               d.newTrigger(lbl, brokerName, namespace, serviceName, 
"process-definition", "ProcessDefinitionEvent", 
constants.KogitoProcessDefinitionsEventsPath, platform),
-               d.newTrigger(lbl, brokerName, namespace, serviceName, 
"process-instance-multiple", "MultipleProcessInstanceDataEvent", 
constants.KogitoProcessInstancesMultiEventsPath, platform),
-               d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", 
"JobEvent", constants.KogitoJobsPath, platform)}, nil, nil
+               d.newTrigger(lbl, annotations, brokerName, namespace, 
serviceName, "process-error", "ProcessInstanceErrorDataEvent", 
constants.KogitoProcessInstancesEventsPath, platform),
+               d.newTrigger(lbl, annotations, brokerName, namespace, 
serviceName, "process-node", "ProcessInstanceNodeDataEvent", 
constants.KogitoProcessInstancesEventsPath, platform),
+               d.newTrigger(lbl, annotations, brokerName, namespace, 
serviceName, "process-state", "ProcessInstanceStateDataEvent", 
constants.KogitoProcessInstancesEventsPath, platform),
+               d.newTrigger(lbl, annotations, brokerName, namespace, 
serviceName, "process-variable", "ProcessInstanceVariableDataEvent", 
constants.KogitoProcessInstancesEventsPath, platform),
+               d.newTrigger(lbl, annotations, brokerName, namespace, 
serviceName, "process-definition", "ProcessDefinitionEvent", 
constants.KogitoProcessDefinitionsEventsPath, platform),
+               d.newTrigger(lbl, annotations, brokerName, namespace, 
serviceName, "process-instance-multiple", "MultipleProcessInstanceDataEvent", 
constants.KogitoProcessInstancesMultiEventsPath, platform),
+               d.newTrigger(lbl, managedAnnotations, brokerName, namespace, 
serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil, nil
 }
 
 func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
@@ -640,6 +645,12 @@ func (d JobServiceHandler) GetSourceBroker() 
*duckv1.Destination {
        return GetPlatformBroker(d.platform)
 }
 
+func addTriggerAnnotations(brokerClass string, annotations map[string]string) {
+       if knative.IsKafkaBroker(brokerClass) {
+               annotations[knative.KafkaKnativeEventingDeliveryOrder] = 
knative.KafkaKnativeEventingDeliveryOrderOrdered
+       }
+}
+
 func (d JobServiceHandler) GetSink() *duckv1.Destination {
        if d.platform.Spec.Services.JobService.Sink != nil {
                return d.platform.Spec.Services.JobService.Sink
@@ -658,7 +669,9 @@ func (j *JobServiceHandler) 
GenerateKnativeResources(platform *operatorapi.Sonat
                if len(namespace) == 0 {
                        namespace = platform.Namespace
                }
-               if err := knative.ValidateBroker(brokerName, namespace); err != 
nil {
+               var brokerObject *eventingv1.Broker
+               var err error
+               if brokerObject, err = knative.ValidateBroker(brokerName, 
namespace); err != nil {
                        event := &corev1.Event{
                                Type:    corev1.EventTypeWarning,
                                Reason:  WaitingKnativeEventing,
@@ -666,11 +679,14 @@ func (j *JobServiceHandler) 
GenerateKnativeResources(platform *operatorapi.Sonat
                        }
                        return nil, event, err
                }
+               annotations := make(map[string]string)
+               addTriggerAnnotations(knative.GetBrokerClass(brokerObject), 
annotations)
                jobCreateTrigger := &eventingv1.Trigger{
                        ObjectMeta: metav1.ObjectMeta{
-                               Name:      
kmeta.ChildName("jobs-service-create-job-", string(platform.GetUID())),
-                               Namespace: namespace,
-                               Labels:    lbl,
+                               Name:        
kmeta.ChildName("jobs-service-create-job-", string(platform.GetUID())),
+                               Namespace:   namespace,
+                               Labels:      lbl,
+                               Annotations: annotations,
                        },
                        Spec: eventingv1.TriggerSpec{
                                Broker: brokerName,
@@ -695,9 +711,10 @@ func (j *JobServiceHandler) 
GenerateKnativeResources(platform *operatorapi.Sonat
                resultObjs = append(resultObjs, jobCreateTrigger)
                jobDeleteTrigger := &eventingv1.Trigger{
                        ObjectMeta: metav1.ObjectMeta{
-                               Name:      
kmeta.ChildName("jobs-service-delete-job-", string(platform.GetUID())),
-                               Namespace: namespace,
-                               Labels:    lbl,
+                               Name:        
kmeta.ChildName("jobs-service-delete-job-", string(platform.GetUID())),
+                               Namespace:   namespace,
+                               Labels:      lbl,
+                               Annotations: annotations,
                        },
                        Spec: eventingv1.TriggerSpec{
                                Broker: brokerName,
diff --git 
a/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go
 
b/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go
index e61881ebf62..396c270fa68 100644
--- 
a/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go
+++ 
b/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go
@@ -384,7 +384,7 @@ func TriggersCreator(workflow *operatorapi.SonataFlow, plf 
*operatorapi.SonataFl
                        // No broker configured for the eventType. Skip and 
will not create trigger for it.
                        continue
                }
-               if err := knative.ValidateBroker(brokerRef.Name, 
brokerRef.Namespace); err != nil {
+               if _, err := knative.ValidateBroker(brokerRef.Name, 
brokerRef.Namespace); err != nil {
                        return nil, err
                }
                // construct eventingv1.Trigger
diff --git 
a/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller_test.go
 
b/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller_test.go
index 925d84a98f4..cb983e82f51 100644
--- 
a/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller_test.go
+++ 
b/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller_test.go
@@ -920,7 +920,6 @@ func TestSonataFlowPlatformController(t *testing.T) {
                validateTrigger(t, cl, "data-index-process-definition-", 
ksp.Namespace, ksp, trigger)
                validateTrigger(t, cl, "data-index-process-error-", 
ksp.Namespace, ksp, trigger)
                validateTrigger(t, cl, "data-index-process-node-", 
ksp.Namespace, ksp, trigger)
-               validateTrigger(t, cl, "data-index-process-sla-", 
ksp.Namespace, ksp, trigger)
                validateTrigger(t, cl, "data-index-process-state-", 
ksp.Namespace, ksp, trigger)
                validateTrigger(t, cl, "data-index-process-variable-", 
ksp.Namespace, ksp, trigger)
 
@@ -1034,8 +1033,6 @@ func TestSonataFlowPlatformController(t *testing.T) {
                assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
                validateTrigger(t, cl, "data-index-process-node-", 
ksp.Namespace, ksp, trigger)
                assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
-               validateTrigger(t, cl, "data-index-process-sla-", 
ksp.Namespace, ksp, trigger)
-               assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
                validateTrigger(t, cl, "data-index-process-state-", 
ksp.Namespace, ksp, trigger)
                assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
                validateTrigger(t, cl, "data-index-process-variable-", 
ksp.Namespace, ksp, trigger)
diff --git a/packages/sonataflow-operator/test/e2e/platform_test.go 
b/packages/sonataflow-operator/test/e2e/platform_test.go
index d2a9ce433ca..c7074176d85 100644
--- a/packages/sonataflow-operator/test/e2e/platform_test.go
+++ b/packages/sonataflow-operator/test/e2e/platform_test.go
@@ -242,7 +242,6 @@ var _ = Describe("Platform Use Cases :: ", 
Label("platform"), Ordered, func() {
                Expect(err).NotTo(HaveOccurred())
                Expect(verifyTrigger(triggers, "data-index-process-error-", 
constants.KogitoProcessInstancesEventsPath, targetNamespace, 
"di-source")).NotTo(HaveOccurred())
                Expect(verifyTrigger(triggers, "data-index-process-node-", 
constants.KogitoProcessInstancesEventsPath, targetNamespace, 
"di-source")).NotTo(HaveOccurred())
-               Expect(verifyTrigger(triggers, "data-index-process-sla-", 
constants.KogitoProcessInstancesEventsPath, targetNamespace, 
"di-source")).NotTo(HaveOccurred())
                Expect(verifyTrigger(triggers, "data-index-process-state-", 
constants.KogitoProcessInstancesEventsPath, targetNamespace, 
"di-source")).NotTo(HaveOccurred())
                Expect(verifyTrigger(triggers, "data-index-process-variable-", 
constants.KogitoProcessInstancesEventsPath, targetNamespace, 
"di-source")).NotTo(HaveOccurred())
                Expect(verifyTrigger(triggers, 
"data-index-process-definition-", constants.KogitoProcessDefinitionsEventsPath, 
targetNamespace, "di-source")).NotTo(HaveOccurred())
@@ -316,7 +315,6 @@ var _ = Describe("Platform Use Cases :: ", 
Label("platform"), Ordered, func() {
                Expect(err).NotTo(HaveOccurred())
                Expect(verifyTrigger(triggers, "data-index-process-error-", 
constants.KogitoProcessInstancesEventsPath, brokerNamespace, 
brokerName)).NotTo(HaveOccurred())
                Expect(verifyTrigger(triggers, "data-index-process-node-", 
constants.KogitoProcessInstancesEventsPath, brokerNamespace, 
brokerName)).NotTo(HaveOccurred())
-               Expect(verifyTrigger(triggers, "data-index-process-sla-", 
constants.KogitoProcessInstancesEventsPath, brokerNamespace, 
brokerName)).NotTo(HaveOccurred())
                Expect(verifyTrigger(triggers, "data-index-process-state-", 
constants.KogitoProcessInstancesEventsPath, brokerNamespace, 
brokerName)).NotTo(HaveOccurred())
                Expect(verifyTrigger(triggers, "data-index-process-variable-", 
constants.KogitoProcessInstancesEventsPath, brokerNamespace, 
brokerName)).NotTo(HaveOccurred())
                Expect(verifyTrigger(triggers, 
"data-index-process-definition-", constants.KogitoProcessDefinitionsEventsPath, 
brokerNamespace, brokerName)).NotTo(HaveOccurred())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to