This is an automated email from the ASF dual-hosted git repository.
wmedvedeo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-tools.git
The following commit(s) were added to refs/heads/main by this push:
new 0ca727d776c incubator-kie-tools-2774: Make the SonataFlow Operator to
configure the Jobs related Knative Eventing Triggers to consume the events in
order (#2775)
0ca727d776c is described below
commit 0ca727d776c3c75f7cdea09479b650fa8ac45f8e
Author: Walter Medvedeo <[email protected]>
AuthorDate: Wed Nov 27 17:03:05 2024 +0100
incubator-kie-tools-2774: Make the SonataFlow Operator to configure the
Jobs related Knative Eventing Triggers to consume the events in order (#2775)
---
.../internal/controller/knative/knative.go | 44 ++++++++++++-----
.../controller/platform/services/services.go | 57 ++++++++++++++--------
.../controller/profiles/common/object_creators.go | 2 +-
.../sonataflowplatform_controller_test.go | 3 --
.../sonataflow-operator/test/e2e/platform_test.go | 2 -
5 files changed, 69 insertions(+), 39 deletions(-)
diff --git
a/packages/sonataflow-operator/internal/controller/knative/knative.go
b/packages/sonataflow-operator/internal/controller/knative/knative.go
index d215a66398f..49f2540b2d4 100644
--- a/packages/sonataflow-operator/internal/controller/knative/knative.go
+++ b/packages/sonataflow-operator/internal/controller/knative/knative.go
@@ -22,11 +22,13 @@ package knative
import (
"context"
"fmt"
+ "strings"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
+ "knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
clienteventingv1
"knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
@@ -51,14 +53,16 @@ type Availability struct {
}
const (
- kSink = "K_SINK"
- knativeBundleVolume = "kne-bundle-volume"
- kCeOverRides = "K_CE_OVERRIDES"
- knativeServingGroup = "serving.knative.dev"
- knativeEventingGroup = "eventing.knative.dev"
- knativeEventingAPIVersion = "eventing.knative.dev/v1"
- knativeBrokerKind = "Broker"
- knativeSinkProvided = "SinkProvided"
+ kSink = "K_SINK"
+ knativeBundleVolume = "kne-bundle-volume"
+ kCeOverRides = "K_CE_OVERRIDES"
+ knativeServingGroup = "serving.knative.dev"
+ knativeEventingGroup = "eventing.knative.dev"
+ knativeEventingAPIVersion = "eventing.knative.dev/v1"
+ knativeBrokerKind = "Broker"
+ knativeSinkProvided = "SinkProvided"
+ KafkaKnativeEventingDeliveryOrder =
"kafka.eventing.knative.dev/delivery.order"
+ KafkaKnativeEventingDeliveryOrderOrdered = "ordered"
)
func GetKnativeServingClient(cfg *rest.Config)
(clientservingv1.ServingV1Interface, error) {
@@ -132,19 +136,33 @@ func getDestinationWithNamespace(dest
*duckv1.Destination, namespace string) *du
return dest
}
-func ValidateBroker(name, namespace string) error {
+func ValidateBroker(name, namespace string) (*eventingv1.Broker, error) {
broker := &eventingv1.Broker{}
if err := utils.GetClient().Get(context.TODO(),
types.NamespacedName{Name: name, Namespace: namespace}, broker); err != nil {
if errors.IsNotFound(err) {
- return fmt.Errorf("broker %s in namespace %s does not
exist", name, namespace)
+ return nil, fmt.Errorf("broker %s in namespace %s does
not exist", name, namespace)
}
- return err
+ return nil, err
}
cond := broker.Status.GetCondition(apis.ConditionReady)
if cond != nil && cond.Status == corev1.ConditionTrue {
- return nil
+ return broker, nil
+ }
+ return nil, fmt.Errorf("broker %s in namespace %s is not ready", name,
namespace)
+}
+
+// GetBrokerClass returns the broker class for a Knative Eventing Broker.
+func GetBrokerClass(broker *eventingv1.Broker) string {
+ if broker.Annotations == nil {
+ return ""
}
- return fmt.Errorf("broker %s in namespace %s is not ready", name,
namespace)
+ return broker.Annotations[eventing.BrokerClassKey]
+}
+
+// IsKafkaBroker returns true if the class for a Knative Eventing Broker
corresponds to a Kafka broker.
+func IsKafkaBroker(brokerClass string) bool {
+ // currently available kafka broker classes are "Kafka", and
"KafkaNamespaced", for safety ask for the substring "Kafka".
+ return strings.Contains(brokerClass, "Kafka")
}
func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl
*operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) {
diff --git
a/packages/sonataflow-operator/internal/controller/platform/services/services.go
b/packages/sonataflow-operator/internal/controller/platform/services/services.go
index 97114d3e777..03427c548f6 100644
---
a/packages/sonataflow-operator/internal/controller/platform/services/services.go
+++
b/packages/sonataflow-operator/internal/controller/platform/services/services.go
@@ -575,12 +575,13 @@ func (d *DataIndexHandler) GetSourceBroker()
*duckv1.Destination {
return GetPlatformBroker(d.platform)
}
-func (d *DataIndexHandler) newTrigger(labels map[string]string, brokerName,
namespace, serviceName, tag, eventType, path string, platform
*operatorapi.SonataFlowPlatform) *eventingv1.Trigger {
+func (d *DataIndexHandler) newTrigger(labels map[string]string, annotations
map[string]string, brokerName, namespace, serviceName, tag, eventType, path
string, platform *operatorapi.SonataFlowPlatform) *eventingv1.Trigger {
return &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
- Name:
kmeta.ChildName(fmt.Sprintf("data-index-%s-", tag), string(platform.GetUID())),
- Namespace: namespace,
- Labels: labels,
+ Name:
kmeta.ChildName(fmt.Sprintf("data-index-%s-", tag), string(platform.GetUID())),
+ Namespace: namespace,
+ Labels: labels,
+ Annotations: annotations,
},
Spec: eventingv1.TriggerSpec{
Broker: brokerName,
@@ -613,7 +614,9 @@ func (d *DataIndexHandler)
GenerateKnativeResources(platform *operatorapi.Sonata
if len(namespace) == 0 {
namespace = platform.Namespace
}
- if err := knative.ValidateBroker(brokerName, namespace); err != nil {
+ var brokerObject *eventingv1.Broker
+ var err error
+ if brokerObject, err = knative.ValidateBroker(brokerName, namespace);
err != nil {
event := &corev1.Event{
Type: corev1.EventTypeWarning,
Reason: WaitingKnativeEventing,
@@ -621,16 +624,18 @@ func (d *DataIndexHandler)
GenerateKnativeResources(platform *operatorapi.Sonata
}
return nil, event, err
}
+ annotations := make(map[string]string)
+ managedAnnotations := make(map[string]string)
+ addTriggerAnnotations(knative.GetBrokerClass(brokerObject),
managedAnnotations)
serviceName := d.GetServiceName()
return []client.Object{
- d.newTrigger(lbl, brokerName, namespace, serviceName,
"process-error", "ProcessInstanceErrorDataEvent",
constants.KogitoProcessInstancesEventsPath, platform),
- d.newTrigger(lbl, brokerName, namespace, serviceName,
"process-node", "ProcessInstanceNodeDataEvent",
constants.KogitoProcessInstancesEventsPath, platform),
- d.newTrigger(lbl, brokerName, namespace, serviceName,
"process-sla", "ProcessInstanceSLADataEvent",
constants.KogitoProcessInstancesEventsPath, platform),
- d.newTrigger(lbl, brokerName, namespace, serviceName,
"process-state", "ProcessInstanceStateDataEvent",
constants.KogitoProcessInstancesEventsPath, platform),
- d.newTrigger(lbl, brokerName, namespace, serviceName,
"process-variable", "ProcessInstanceVariableDataEvent",
constants.KogitoProcessInstancesEventsPath, platform),
- d.newTrigger(lbl, brokerName, namespace, serviceName,
"process-definition", "ProcessDefinitionEvent",
constants.KogitoProcessDefinitionsEventsPath, platform),
- d.newTrigger(lbl, brokerName, namespace, serviceName,
"process-instance-multiple", "MultipleProcessInstanceDataEvent",
constants.KogitoProcessInstancesMultiEventsPath, platform),
- d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs",
"JobEvent", constants.KogitoJobsPath, platform)}, nil, nil
+ d.newTrigger(lbl, annotations, brokerName, namespace,
serviceName, "process-error", "ProcessInstanceErrorDataEvent",
constants.KogitoProcessInstancesEventsPath, platform),
+ d.newTrigger(lbl, annotations, brokerName, namespace,
serviceName, "process-node", "ProcessInstanceNodeDataEvent",
constants.KogitoProcessInstancesEventsPath, platform),
+ d.newTrigger(lbl, annotations, brokerName, namespace,
serviceName, "process-state", "ProcessInstanceStateDataEvent",
constants.KogitoProcessInstancesEventsPath, platform),
+ d.newTrigger(lbl, annotations, brokerName, namespace,
serviceName, "process-variable", "ProcessInstanceVariableDataEvent",
constants.KogitoProcessInstancesEventsPath, platform),
+ d.newTrigger(lbl, annotations, brokerName, namespace,
serviceName, "process-definition", "ProcessDefinitionEvent",
constants.KogitoProcessDefinitionsEventsPath, platform),
+ d.newTrigger(lbl, annotations, brokerName, namespace,
serviceName, "process-instance-multiple", "MultipleProcessInstanceDataEvent",
constants.KogitoProcessInstancesMultiEventsPath, platform),
+ d.newTrigger(lbl, managedAnnotations, brokerName, namespace,
serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil, nil
}
func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
@@ -640,6 +645,12 @@ func (d JobServiceHandler) GetSourceBroker()
*duckv1.Destination {
return GetPlatformBroker(d.platform)
}
+func addTriggerAnnotations(brokerClass string, annotations map[string]string) {
+ if knative.IsKafkaBroker(brokerClass) {
+ annotations[knative.KafkaKnativeEventingDeliveryOrder] =
knative.KafkaKnativeEventingDeliveryOrderOrdered
+ }
+}
+
func (d JobServiceHandler) GetSink() *duckv1.Destination {
if d.platform.Spec.Services.JobService.Sink != nil {
return d.platform.Spec.Services.JobService.Sink
@@ -658,7 +669,9 @@ func (j *JobServiceHandler)
GenerateKnativeResources(platform *operatorapi.Sonat
if len(namespace) == 0 {
namespace = platform.Namespace
}
- if err := knative.ValidateBroker(brokerName, namespace); err !=
nil {
+ var brokerObject *eventingv1.Broker
+ var err error
+ if brokerObject, err = knative.ValidateBroker(brokerName,
namespace); err != nil {
event := &corev1.Event{
Type: corev1.EventTypeWarning,
Reason: WaitingKnativeEventing,
@@ -666,11 +679,14 @@ func (j *JobServiceHandler)
GenerateKnativeResources(platform *operatorapi.Sonat
}
return nil, event, err
}
+ annotations := make(map[string]string)
+ addTriggerAnnotations(knative.GetBrokerClass(brokerObject),
annotations)
jobCreateTrigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
- Name:
kmeta.ChildName("jobs-service-create-job-", string(platform.GetUID())),
- Namespace: namespace,
- Labels: lbl,
+ Name:
kmeta.ChildName("jobs-service-create-job-", string(platform.GetUID())),
+ Namespace: namespace,
+ Labels: lbl,
+ Annotations: annotations,
},
Spec: eventingv1.TriggerSpec{
Broker: brokerName,
@@ -695,9 +711,10 @@ func (j *JobServiceHandler)
GenerateKnativeResources(platform *operatorapi.Sonat
resultObjs = append(resultObjs, jobCreateTrigger)
jobDeleteTrigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
- Name:
kmeta.ChildName("jobs-service-delete-job-", string(platform.GetUID())),
- Namespace: namespace,
- Labels: lbl,
+ Name:
kmeta.ChildName("jobs-service-delete-job-", string(platform.GetUID())),
+ Namespace: namespace,
+ Labels: lbl,
+ Annotations: annotations,
},
Spec: eventingv1.TriggerSpec{
Broker: brokerName,
diff --git
a/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go
b/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go
index e61881ebf62..396c270fa68 100644
---
a/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go
+++
b/packages/sonataflow-operator/internal/controller/profiles/common/object_creators.go
@@ -384,7 +384,7 @@ func TriggersCreator(workflow *operatorapi.SonataFlow, plf
*operatorapi.SonataFl
// No broker configured for the eventType. Skip and
will not create trigger for it.
continue
}
- if err := knative.ValidateBroker(brokerRef.Name,
brokerRef.Namespace); err != nil {
+ if _, err := knative.ValidateBroker(brokerRef.Name,
brokerRef.Namespace); err != nil {
return nil, err
}
// construct eventingv1.Trigger
diff --git
a/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller_test.go
b/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller_test.go
index 925d84a98f4..cb983e82f51 100644
---
a/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller_test.go
+++
b/packages/sonataflow-operator/internal/controller/sonataflowplatform_controller_test.go
@@ -920,7 +920,6 @@ func TestSonataFlowPlatformController(t *testing.T) {
validateTrigger(t, cl, "data-index-process-definition-",
ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-error-",
ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-node-",
ksp.Namespace, ksp, trigger)
- validateTrigger(t, cl, "data-index-process-sla-",
ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-state-",
ksp.Namespace, ksp, trigger)
validateTrigger(t, cl, "data-index-process-variable-",
ksp.Namespace, ksp, trigger)
@@ -1034,8 +1033,6 @@ func TestSonataFlowPlatformController(t *testing.T) {
assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
validateTrigger(t, cl, "data-index-process-node-",
ksp.Namespace, ksp, trigger)
assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
- validateTrigger(t, cl, "data-index-process-sla-",
ksp.Namespace, ksp, trigger)
- assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
validateTrigger(t, cl, "data-index-process-state-",
ksp.Namespace, ksp, trigger)
assert.Equal(t, trigger.Spec.Broker, brokerNameDataIndexSource)
validateTrigger(t, cl, "data-index-process-variable-",
ksp.Namespace, ksp, trigger)
diff --git a/packages/sonataflow-operator/test/e2e/platform_test.go
b/packages/sonataflow-operator/test/e2e/platform_test.go
index d2a9ce433ca..c7074176d85 100644
--- a/packages/sonataflow-operator/test/e2e/platform_test.go
+++ b/packages/sonataflow-operator/test/e2e/platform_test.go
@@ -242,7 +242,6 @@ var _ = Describe("Platform Use Cases :: ",
Label("platform"), Ordered, func() {
Expect(err).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-error-",
constants.KogitoProcessInstancesEventsPath, targetNamespace,
"di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-node-",
constants.KogitoProcessInstancesEventsPath, targetNamespace,
"di-source")).NotTo(HaveOccurred())
- Expect(verifyTrigger(triggers, "data-index-process-sla-",
constants.KogitoProcessInstancesEventsPath, targetNamespace,
"di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-state-",
constants.KogitoProcessInstancesEventsPath, targetNamespace,
"di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-variable-",
constants.KogitoProcessInstancesEventsPath, targetNamespace,
"di-source")).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers,
"data-index-process-definition-", constants.KogitoProcessDefinitionsEventsPath,
targetNamespace, "di-source")).NotTo(HaveOccurred())
@@ -316,7 +315,6 @@ var _ = Describe("Platform Use Cases :: ",
Label("platform"), Ordered, func() {
Expect(err).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-error-",
constants.KogitoProcessInstancesEventsPath, brokerNamespace,
brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-node-",
constants.KogitoProcessInstancesEventsPath, brokerNamespace,
brokerName)).NotTo(HaveOccurred())
- Expect(verifyTrigger(triggers, "data-index-process-sla-",
constants.KogitoProcessInstancesEventsPath, brokerNamespace,
brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-state-",
constants.KogitoProcessInstancesEventsPath, brokerNamespace,
brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers, "data-index-process-variable-",
constants.KogitoProcessInstancesEventsPath, brokerNamespace,
brokerName)).NotTo(HaveOccurred())
Expect(verifyTrigger(triggers,
"data-index-process-definition-", constants.KogitoProcessDefinitionsEventsPath,
brokerNamespace, brokerName)).NotTo(HaveOccurred())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]