ricardozanini commented on code in PR #467:
URL: 
https://github.com/apache/incubator-kie-kogito-serverless-operator/pull/467#discussion_r1606895565


##########
api/v1alpha08/sonataflowplatform_types.go:
##########
@@ -61,6 +65,15 @@ type SonataFlowPlatformSpec struct {
        Properties *PropertyPlatformSpec `json:"properties,omitempty"`
 }
 
+// PlatformEventingSpec specifies the broker to be used as the default sink or 
source

Review Comment:
   ```suggestion
   // PlatformEventingSpec specifies the Knative Eventing integration details 
in the platform.
   ```



##########
api/v1alpha08/sonataflowplatform_types.go:
##########
@@ -47,6 +48,9 @@ type SonataFlowPlatformSpec struct {
        // +optional
        // 
+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Services"
        Services *ServicesPlatformSpec `json:"services,omitempty"`
+       // Eventing defines a broker as the default broker when the workflow, 
Dataindex, or Jobservice does not have a sink or source specified.

Review Comment:
   ```suggestion
        // Eventing describes the information required for Knative Eventing 
integration in the platform.
   ```



##########
controllers/knative/knative.go:
##########
@@ -84,3 +102,100 @@ func GetKnativeAvailability(cfg *rest.Config) 
(*Availability, error) {
                return result, nil
        }
 }
+
+func GetWorkflowSink(ctx context.Context, c client.Client, workflow 
*operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform) 
(*duckv1.Destination, error) {
+       if workflow == nil {
+               return nil, nil
+       }
+       if workflow.Spec.Sink != nil {
+               return workflow.Spec.Sink, nil
+       }
+       if pl != nil {
+               if pl.Spec.Eventing != nil {
+                       // no sink defined in the workflow, use the platform 
broker
+                       return pl.Spec.Eventing.Broker, nil
+               } else if pl.Status.ClusterPlatformRef != nil {
+                       // Find the platform referred by the cluster platform
+                       platform := &operatorapi.SonataFlowPlatform{}
+                       if err := c.Get(ctx, types.NamespacedName{Namespace: 
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name: 
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+                               return nil, fmt.Errorf("error reading the 
platform referred by the cluster platform")
+                       }
+                       if platform.Spec.Eventing != nil {
+                               return platform.Spec.Eventing.Broker, nil
+                       }
+               }
+       }
+       return nil, nil
+}
+
+const knativeBrokerAnnotation = "eventing.knative.dev/broker.class"

Review Comment:
   Can you keep the `const` on the top of the file? It's easier to maintain 
later.



##########
controllers/knative/knative.go:
##########
@@ -84,3 +102,100 @@ func GetKnativeAvailability(cfg *rest.Config) 
(*Availability, error) {
                return result, nil
        }
 }
+
+func GetWorkflowSink(ctx context.Context, c client.Client, workflow 
*operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform) 
(*duckv1.Destination, error) {
+       if workflow == nil {
+               return nil, nil
+       }
+       if workflow.Spec.Sink != nil {
+               return workflow.Spec.Sink, nil
+       }
+       if pl != nil {
+               if pl.Spec.Eventing != nil {
+                       // no sink defined in the workflow, use the platform 
broker
+                       return pl.Spec.Eventing.Broker, nil
+               } else if pl.Status.ClusterPlatformRef != nil {
+                       // Find the platform referred by the cluster platform
+                       platform := &operatorapi.SonataFlowPlatform{}
+                       if err := c.Get(ctx, types.NamespacedName{Namespace: 
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name: 
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+                               return nil, fmt.Errorf("error reading the 
platform referred by the cluster platform")
+                       }
+                       if platform.Spec.Eventing != nil {
+                               return platform.Spec.Eventing.Broker, nil
+                       }
+               }
+       }
+       return nil, nil
+}
+
+const knativeBrokerAnnotation = "eventing.knative.dev/broker.class"
+
+func GetKnativeResource(ctx context.Context, cfg *rest.Config, kRef 
*duckv1.KReference) (*unstructured.Unstructured, error) {
+       dynamicClient, err := dynamic.NewForConfig(cfg)
+       if err != nil {
+               return nil, err
+       }
+       gv, err := schema.ParseGroupVersion(kRef.APIVersion)
+       if err != nil {
+               return nil, err
+       }
+       resourceId := schema.GroupVersionResource{
+               Group:    gv.Group,
+               Version:  gv.Version,
+               Resource: kRef.Kind,
+       }
+       if len(kRef.Namespace) == 0 {
+               return nil, fmt.Errorf("namespace for knative resource %s is 
missing", kRef.Name)
+       }
+       list, err := 
dynamicClient.Resource(resourceId).Namespace(kRef.Namespace).List(ctx, 
metav1.ListOptions{})
+       fmt.Printf("list:%v", list)

Review Comment:
   Debug leftovers, right? :D



##########
controllers/platform/services/knative.go:
##########
@@ -0,0 +1,81 @@
+// Copyright 2024 Apache Software Foundation (ASF)
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package services
+
+import (
+       "context"
+       "fmt"
+
+       
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative"
+
+       
"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+       "k8s.io/apimachinery/pkg/runtime/schema"
+       "k8s.io/client-go/dynamic"
+       duckv1 "knative.dev/pkg/apis/duck/v1"
+)
+
+const knativeBrokerAnnotation = "eventing.knative.dev/broker.class"
+
+func KnativeResourceExists(ctx context.Context, c client.Client, dest 
*duckv1.Destination, nsDefault string) (*unstructured.Unstructured, error) {
+       dynamicClient, err := dynamic.NewForConfig(c.GetConfig())
+       if err != nil {
+               return nil, err
+       }
+       resourceId := schema.GroupVersionResource{
+               Group:    dest.Ref.Group,
+               Version:  dest.Ref.APIVersion,
+               Resource: dest.Ref.Kind,
+       }
+       namespace := dest.Ref.Namespace
+       if len(namespace) == 0 {
+               namespace = nsDefault
+       }
+       return dynamicClient.Resource(resourceId).Namespace(namespace).Get(ctx, 
dest.Ref.Name, metav1.GetOptions{})
+}
+
+func IsKnativeBroker(ctx context.Context, c client.Client, dest 
*duckv1.Destination, namespace string) (bool, error) {
+       if dest == nil {
+               return false, fmt.Errorf("broker destinition is empty")
+       }
+       knativeAvail := GetKnativeAvailability(c)
+       if !knativeAvail.Eventing {
+               return false, fmt.Errorf("knative eventing is not deployed in 
the cluster")
+       }
+       obj, err := KnativeResourceExists(ctx, c, dest, namespace)
+       if err != nil {
+               return false, fmt.Errorf("failed to find knative resource %v", 
dest)
+       }
+       annotations := obj.GetAnnotations()
+       if len(annotations) > 0 {
+               if _, ok := annotations[knativeBrokerAnnotation]; ok {
+                       return true, nil
+               }
+       }
+       return false, nil
+}
+
+func GetKnativeAvailability(c client.Client) *knative.Availability {

Review Comment:
   I think we already have something in place: 
https://github.com/apache/incubator-kie-kogito-serverless-operator/blob/main/controllers/knative/knative.go#L67



##########
controllers/knative/knative.go:
##########
@@ -84,3 +102,100 @@ func GetKnativeAvailability(cfg *rest.Config) 
(*Availability, error) {
                return result, nil
        }
 }
+
+func GetWorkflowSink(ctx context.Context, c client.Client, workflow 
*operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform) 
(*duckv1.Destination, error) {
+       if workflow == nil {
+               return nil, nil
+       }
+       if workflow.Spec.Sink != nil {
+               return workflow.Spec.Sink, nil
+       }
+       if pl != nil {
+               if pl.Spec.Eventing != nil {
+                       // no sink defined in the workflow, use the platform 
broker
+                       return pl.Spec.Eventing.Broker, nil
+               } else if pl.Status.ClusterPlatformRef != nil {
+                       // Find the platform referred by the cluster platform
+                       platform := &operatorapi.SonataFlowPlatform{}
+                       if err := c.Get(ctx, types.NamespacedName{Namespace: 
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name: 
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {

Review Comment:
   Don't we already have a function to read the platform doing a similar logic? 
We pass the `SonataFlowPlatform` and get back either itself or the cluster ref?



##########
controllers/profiles/common/object_creators.go:
##########
@@ -51,14 +54,17 @@ import (
 
 // ObjectCreator is the func that creates the initial reference object, if the 
object doesn't exist in the cluster, this one is created.
 // Can be used as a reference to keep the object immutable
-type ObjectCreator func(workflow *operatorapi.SonataFlow) (client.Object, 
error)
+type ObjectCreator func(workflow *operatorapi.SonataFlow, support 
*StateSupport) (client.Object, error)

Review Comment:
   Please review this API change, it's unnecessary if all you need is the 
`client`. That brings a lot of change to the code base. Same for the `Ensurer`.



##########
controllers/profiles/common/properties/managed.go:
##########
@@ -183,10 +185,10 @@ func NewManagedPropertyHandler(workflow 
*operatorapi.SonataFlow, platform *opera
        return handler.withKogitoServiceUrl(), nil
 }
 
-// ApplicationManagedProperties immutable default application properties that 
can be used with any workflow based on Quarkus.
-// Alias for NewManagedPropertyHandler(workflow).Build()
-func ApplicationManagedProperties(workflow *operatorapi.SonataFlow, platform 
*operatorapi.SonataFlowPlatform) (string, error) {
-       p, err := NewManagedPropertyHandler(workflow, platform)
+// ImmutableApplicationProperties immutable default application properties 
that can be used with any workflow based on Quarkus.
+// Alias for NewAppPropertyHandler(workflow).Build()
+func ImmutableApplicationProperties(ctx context.Context, c client.Client, 
workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) 
(string, error) {

Review Comment:
   Why have you changed this function name?



##########
controllers/profiles/common/mutate_visitors.go:
##########
@@ -190,8 +216,34 @@ func RolloutDeploymentIfCMChangedMutateVisitor(workflow 
*operatorapi.SonataFlow,
        return func(object client.Object) controllerutil.MutateFn {
                return func() error {
                        deployment := object.(*appsv1.Deployment)
+                       restoreKnativeVolumeAndVolumeMount(deployment)
                        err := 
kubeutil.AnnotateDeploymentConfigChecksum(workflow, deployment, userPropsCM, 
managedPropsCM)
                        return err
                }
        }
 }
+
+func moveKnativeVolumeToEnd(vols []corev1.Volume) {
+       for i := 0; i < len(vols)-1; i++ {
+               if vols[i].Name == KnativeBundleVolume {
+                       vols[i], vols[i+1] = vols[i+1], vols[i]
+               }
+       }
+}
+
+func moveKnativeVolumeMountToEnd(mounts []corev1.VolumeMount) {
+       for i := 0; i < len(mounts)-1; i++ {
+               if mounts[i].Name == KnativeBundleVolume {
+                       mounts[i], mounts[i+1] = mounts[i+1], mounts[i]
+               }
+       }
+}
+
+// Knative Sinkbinding injects K_SINK env, a volume and volumn mount. The 
volume and volume mount
+// must be in the end of the array to avoid repeadly restarting of the 
workflow pod
+func restoreKnativeVolumeAndVolumeMount(deployment *appsv1.Deployment) {
+       moveKnativeVolumeToEnd(deployment.Spec.Template.Spec.Volumes)
+       for i := 0; i < len(deployment.Spec.Template.Spec.Containers); i++ {
+               
moveKnativeVolumeMountToEnd(deployment.Spec.Template.Spec.Containers[i].VolumeMounts)
+       }
+}

Review Comment:
   Please move these knative specifics to a `knative.go` file in this module.



##########
controllers/knative/knative.go:
##########
@@ -84,3 +102,100 @@ func GetKnativeAvailability(cfg *rest.Config) 
(*Availability, error) {
                return result, nil
        }
 }
+
+func GetWorkflowSink(ctx context.Context, c client.Client, workflow 
*operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform) 
(*duckv1.Destination, error) {
+       if workflow == nil {
+               return nil, nil
+       }
+       if workflow.Spec.Sink != nil {
+               return workflow.Spec.Sink, nil
+       }
+       if pl != nil {
+               if pl.Spec.Eventing != nil {

Review Comment:
   I think you can simplify this conditional with `pl != nil && 
pl.Spec.Eventing != nil`.



##########
api/v1alpha08/sonataflowplatform_types.go:
##########
@@ -61,6 +65,15 @@ type SonataFlowPlatformSpec struct {
        Properties *PropertyPlatformSpec `json:"properties,omitempty"`
 }
 
+// PlatformEventingSpec specifies the broker to be used as the default sink or 
source
+// +k8s:openapi-gen=true
+type PlatformEventingSpec struct {
+       // Broker to commute with workflow deployment

Review Comment:
   ```suggestion
        // Broker to commute with workflow deployment. It can be the default 
broker when the workflow, Dataindex, or Jobservice does not have a sink or 
source specified.
   ```



##########
controllers/platform/services/properties.go:
##########
@@ -157,11 +160,15 @@ func generateReactiveURL(postgresSpec 
*operatorapi.PersistencePostgreSQL, schema
 // with the Data Index. For the calculation this function considers if the 
Data Index is present in the
 // SonataFlowPlatform, if not present, no properties.
 // Never nil.
-func GenerateDataIndexWorkflowProperties(workflow *operatorapi.SonataFlow, 
platform *operatorapi.SonataFlowPlatform) (*properties.Properties, error) {
+func GenerateDataIndexWorkflowProperties(ctx context.Context, client 
client.Client, workflow *operatorapi.SonataFlow, platform 
*operatorapi.SonataFlowPlatform) (*properties.Properties, error) {

Review Comment:
   We can avoid passing `client` here.



##########
controllers/platform/k8s.go:
##########
@@ -61,14 +62,14 @@ func (action *serviceAction) Handle(ctx context.Context, 
platform *operatorapi.S
                return nil, err
        }
 
-       psDI := services.NewDataIndexHandler(platform)
+       psDI := services.NewDataIndexHandler(ctx, action.client, platform)
        if psDI.IsServiceSetInSpec() {
                if err := createOrUpdateServiceComponents(ctx, action.client, 
platform, psDI); err != nil {
                        return nil, err
                }
        }
 
-       psJS := services.NewJobServiceHandler(platform)
+       psJS := services.NewJobServiceHandler(ctx, action.client, platform)

Review Comment:
   There's no need to pass the `client` reference anymore. We can access it 
from the `utils` package.



##########
controllers/knative/knative.go:
##########
@@ -84,3 +102,100 @@ func GetKnativeAvailability(cfg *rest.Config) 
(*Availability, error) {
                return result, nil
        }
 }
+
+func GetWorkflowSink(ctx context.Context, c client.Client, workflow 
*operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform) 
(*duckv1.Destination, error) {
+       if workflow == nil {
+               return nil, nil
+       }
+       if workflow.Spec.Sink != nil {
+               return workflow.Spec.Sink, nil
+       }
+       if pl != nil {
+               if pl.Spec.Eventing != nil {
+                       // no sink defined in the workflow, use the platform 
broker
+                       return pl.Spec.Eventing.Broker, nil
+               } else if pl.Status.ClusterPlatformRef != nil {
+                       // Find the platform referred by the cluster platform
+                       platform := &operatorapi.SonataFlowPlatform{}
+                       if err := c.Get(ctx, types.NamespacedName{Namespace: 
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name: 
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+                               return nil, fmt.Errorf("error reading the 
platform referred by the cluster platform")
+                       }
+                       if platform.Spec.Eventing != nil {
+                               return platform.Spec.Eventing.Broker, nil
+                       }
+               }
+       }
+       return nil, nil
+}
+
+const knativeBrokerAnnotation = "eventing.knative.dev/broker.class"
+
+func GetKnativeResource(ctx context.Context, cfg *rest.Config, kRef 
*duckv1.KReference) (*unstructured.Unstructured, error) {
+       dynamicClient, err := dynamic.NewForConfig(cfg)
+       if err != nil {
+               return nil, err
+       }
+       gv, err := schema.ParseGroupVersion(kRef.APIVersion)
+       if err != nil {
+               return nil, err
+       }
+       resourceId := schema.GroupVersionResource{
+               Group:    gv.Group,
+               Version:  gv.Version,
+               Resource: kRef.Kind,
+       }
+       if len(kRef.Namespace) == 0 {
+               return nil, fmt.Errorf("namespace for knative resource %s is 
missing", kRef.Name)
+       }
+       list, err := 
dynamicClient.Resource(resourceId).Namespace(kRef.Namespace).List(ctx, 
metav1.ListOptions{})
+       fmt.Printf("list:%v", list)
+       return 
dynamicClient.Resource(resourceId).Namespace(kRef.Namespace).Get(ctx, 
kRef.Name, metav1.GetOptions{})
+}
+
+func IsKnativeBroker(kRef *duckv1.KReference) bool {
+       return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind == 
"Broker"
+}
+
+func IsKnativeEnvInjected(ctx context.Context, c client.Client, 
deploymentName, namespace string) (bool, error) {

Review Comment:
   If this is a public function and the `Deployment` is ours, can you change 
the input param to receive a `SonataFlow` reference instead?



##########
controllers/knative/knative.go:
##########
@@ -84,3 +102,100 @@ func GetKnativeAvailability(cfg *rest.Config) 
(*Availability, error) {
                return result, nil
        }
 }
+
+func GetWorkflowSink(ctx context.Context, c client.Client, workflow 
*operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform) 
(*duckv1.Destination, error) {
+       if workflow == nil {
+               return nil, nil
+       }
+       if workflow.Spec.Sink != nil {
+               return workflow.Spec.Sink, nil
+       }
+       if pl != nil {
+               if pl.Spec.Eventing != nil {
+                       // no sink defined in the workflow, use the platform 
broker
+                       return pl.Spec.Eventing.Broker, nil
+               } else if pl.Status.ClusterPlatformRef != nil {
+                       // Find the platform referred by the cluster platform
+                       platform := &operatorapi.SonataFlowPlatform{}
+                       if err := c.Get(ctx, types.NamespacedName{Namespace: 
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name: 
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+                               return nil, fmt.Errorf("error reading the 
platform referred by the cluster platform")
+                       }
+                       if platform.Spec.Eventing != nil {
+                               return platform.Spec.Eventing.Broker, nil
+                       }
+               }
+       }
+       return nil, nil
+}
+
+const knativeBrokerAnnotation = "eventing.knative.dev/broker.class"
+
+func GetKnativeResource(ctx context.Context, cfg *rest.Config, kRef 
*duckv1.KReference) (*unstructured.Unstructured, error) {

Review Comment:
   Knative doesn't have a similar function that does this already?



##########
controllers/platform/services/properties.go:
##########
@@ -183,19 +199,29 @@ func GenerateDataIndexWorkflowProperties(workflow 
*operatorapi.SonataFlow, platf
 // with the Job Service. For the calculation this function considers if the 
Job Service is present in the
 // SonataFlowPlatform, if not present, no properties.
 // Never nil.
-func GenerateJobServiceWorkflowProperties(workflow *operatorapi.SonataFlow, 
platform *operatorapi.SonataFlowPlatform) (*properties.Properties, error) {
+func GenerateJobServiceWorkflowProperties(ctx context.Context, client 
client.Client, workflow *operatorapi.SonataFlow, platform 
*operatorapi.SonataFlowPlatform) (*properties.Properties, error) {

Review Comment:
   Same about `client`.



##########
controllers/platform/services/services.go:
##########
@@ -505,3 +576,167 @@ func mergeContainerPreservingEnvVars(dest 
*corev1.Container, source *corev1.Cont
        }
        return nil
 }
+
+// GetPlatformBroker gets the default broker for the platform.
+func GetPlatformBroker(platform *operatorapi.SonataFlowPlatform) 
*duckv1.Destination {
+       if platform != nil && platform.Spec.Eventing != nil && 
platform.Spec.Eventing.Broker != nil {
+               return platform.Spec.Eventing.Broker
+       }
+       return nil
+}
+
+func (d *DataIndexHandler) GetSourceBroker() *duckv1.Destination {
+       if d.platform != nil && d.platform.Spec.Services.DataIndex.Source != 
nil && d.platform.Spec.Services.DataIndex.Source.Ref != nil {
+               return d.platform.Spec.Services.DataIndex.Source
+       }
+       return GetPlatformBroker(d.platform)
+}
+
+func (d *DataIndexHandler) newTrigger(labels map[string]string, brokerName, 
namespace, platformName, serviceName, tag, eventType, path string) 
*eventingv1.Trigger {
+       return &eventingv1.Trigger{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      fmt.Sprintf("%s-data-index-%s-trigger", 
platformName, tag),
+                       Namespace: namespace,
+                       Labels:    labels,
+               },
+               Spec: eventingv1.TriggerSpec{
+                       Broker: brokerName,
+                       Filter: &eventingv1.TriggerFilter{
+                               Attributes: eventingv1.TriggerFilterAttributes{
+                                       "type": eventType,
+                               },
+                       },
+                       Subscriber: duckv1.Destination{
+                               Ref: &duckv1.KReference{
+                                       Name:       serviceName,
+                                       Namespace:  namespace,
+                                       APIVersion: "v1",
+                                       Kind:       "Service",
+                               },
+                               URI: &apis.URL{
+                                       Path: path,
+                               },
+                       },
+               },
+       }
+}
+func (d *DataIndexHandler) GenerateKnativeResources(platform 
*operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, 
error) {
+       broker := d.GetSourceBroker()
+       if broker == nil {
+               return nil, nil // Nothing to do
+       }
+       brokerName := broker.Ref.Name
+       namespace := platform.Namespace
+       serviceName := d.GetServiceName()
+       return []client.Object{
+               d.newTrigger(lbl, brokerName, namespace, platform.Name, 
serviceName, "process-error", "ProcessInstanceErrorDataEvent", pathProcesses),
+               d.newTrigger(lbl, brokerName, namespace, platform.Name, 
serviceName, "process-node", "ProcessInstanceNodeDataEvent", pathProcesses),
+               d.newTrigger(lbl, brokerName, namespace, platform.Name, 
serviceName, "process-sla", "ProcessInstanceSLADataEvent", pathProcesses),
+               d.newTrigger(lbl, brokerName, namespace, platform.Name, 
serviceName, "process-state", "ProcessInstanceStateDataEvent", pathProcesses),
+               d.newTrigger(lbl, brokerName, namespace, platform.Name, 
serviceName, "process-variable", "ProcessInstanceVariableDataEvent", 
pathProcesses),
+               d.newTrigger(lbl, brokerName, namespace, platform.Name, 
serviceName, "process-definition", "ProcessDefinitionEvent", pathDefinitions),
+               d.newTrigger(lbl, brokerName, namespace, platform.Name, 
serviceName, "jobs", "JobEvent", pathJobs)}, nil
+}
+
+func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
+       if d.platform.Spec.Services.JobService.Source != nil && 
d.platform.Spec.Services.JobService.Source.Ref != nil {
+               return d.platform.Spec.Services.JobService.Source
+       }
+       return GetPlatformBroker(d.platform)
+}
+
+func (d JobServiceHandler) GetSink() *duckv1.Destination {
+       if d.platform.Spec.Services.JobService.Sink != nil {
+               return d.platform.Spec.Services.JobService.Source
+       }
+       return GetPlatformBroker(d.platform)
+}
+
+func (j *JobServiceHandler) GenerateKnativeResources(platform 
*operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, 
error) {
+       broker := j.GetSourceBroker()
+       sink := j.GetSink()
+       namespace := platform.Namespace
+       resultObjs := []client.Object{}
+
+       if broker != nil {
+               brokerName := broker.Ref.Name
+               jobCreateTrigger := &eventingv1.Trigger{
+                       ObjectMeta: metav1.ObjectMeta{
+                               Name:      "jobs-service-create-job-trigger",

Review Comment:
   What is the naming strategy for the Jobs Service instance? Maybe we should 
append that name here.



##########
workflowproj/operator.go:
##########
@@ -41,6 +41,8 @@ const (
        LabelService = metadata.Domain + "/service"
        // LabelWorkflow specialized label managed by the controller
        LabelWorkflow = metadata.Domain + "/workflow-app"
+       // LabelWorkflowNamespace specialized label managed by the controller 
indicating the namespace of the workflow
+       LabelWorkflowNamespace = metadata.Domain + "/workflow-namespace"

Review Comment:
   Can you refresh my memory here. Why do we need this? The 
`metadata.namespace` already cares for this info.



##########
controllers/profiles/common/mutate_visitors.go:
##########
@@ -96,25 +100,47 @@ func EnsureDeployment(original *appsv1.Deployment, object 
*appsv1.Deployment) er
        object.Spec.Replicas = original.Spec.Replicas
        object.Spec.Selector = original.Spec.Selector
        object.Labels = original.GetLabels()
+       object.Finalizers = original.Finalizers
 
        // Clean up the volumes, they are inherited from original, additional 
are added by other visitors
-       object.Spec.Template.Spec.Volumes = nil
+       // However, the knative mount path must be preserved
+       var kneVol *corev1.Volume = nil

Review Comment:
   Please keep this in a different mutator.



##########
controllers/knative/knative.go:
##########
@@ -84,3 +102,100 @@ func GetKnativeAvailability(cfg *rest.Config) 
(*Availability, error) {
                return result, nil
        }
 }
+
+func GetWorkflowSink(ctx context.Context, c client.Client, workflow 
*operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform) 
(*duckv1.Destination, error) {
+       if workflow == nil {
+               return nil, nil
+       }
+       if workflow.Spec.Sink != nil {
+               return workflow.Spec.Sink, nil
+       }
+       if pl != nil {
+               if pl.Spec.Eventing != nil {
+                       // no sink defined in the workflow, use the platform 
broker
+                       return pl.Spec.Eventing.Broker, nil
+               } else if pl.Status.ClusterPlatformRef != nil {
+                       // Find the platform referred by the cluster platform
+                       platform := &operatorapi.SonataFlowPlatform{}
+                       if err := c.Get(ctx, types.NamespacedName{Namespace: 
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name: 
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+                               return nil, fmt.Errorf("error reading the 
platform referred by the cluster platform")
+                       }
+                       if platform.Spec.Eventing != nil {
+                               return platform.Spec.Eventing.Broker, nil
+                       }
+               }
+       }
+       return nil, nil
+}
+
+const knativeBrokerAnnotation = "eventing.knative.dev/broker.class"
+
+func GetKnativeResource(ctx context.Context, cfg *rest.Config, kRef 
*duckv1.KReference) (*unstructured.Unstructured, error) {
+       dynamicClient, err := dynamic.NewForConfig(cfg)
+       if err != nil {
+               return nil, err
+       }
+       gv, err := schema.ParseGroupVersion(kRef.APIVersion)
+       if err != nil {
+               return nil, err
+       }
+       resourceId := schema.GroupVersionResource{
+               Group:    gv.Group,
+               Version:  gv.Version,
+               Resource: kRef.Kind,
+       }
+       if len(kRef.Namespace) == 0 {
+               return nil, fmt.Errorf("namespace for knative resource %s is 
missing", kRef.Name)
+       }
+       list, err := 
dynamicClient.Resource(resourceId).Namespace(kRef.Namespace).List(ctx, 
metav1.ListOptions{})
+       fmt.Printf("list:%v", list)
+       return 
dynamicClient.Resource(resourceId).Namespace(kRef.Namespace).Get(ctx, 
kRef.Name, metav1.GetOptions{})
+}
+
+func IsKnativeBroker(kRef *duckv1.KReference) bool {
+       return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind == 
"Broker"
+}
+
+func IsKnativeEnvInjected(ctx context.Context, c client.Client, 
deploymentName, namespace string) (bool, error) {
+       deployment := &appsv1.Deployment{}
+       if err := c.Get(ctx, types.NamespacedName{Name: deploymentName, 
Namespace: namespace}, deployment); err != nil {
+               if errors.IsNotFound(err) {
+                       return false, nil //deployment not found
+               }
+               return false, err
+       }
+       for _, env := range deployment.Spec.Template.Spec.Containers[0].Env {
+               if env.Name == KSink {
+                       return true, nil
+               }
+       }
+       return false, nil
+}
+
+func IsDataIndexEnabled(plf *operatorapi.SonataFlowPlatform) bool {
+       if plf.Spec.Services != nil {
+               if plf.Spec.Services.DataIndex != nil {
+                       return 
pointer.BoolDeref(plf.Spec.Services.DataIndex.Enabled, false)
+               }
+               return false
+       }
+       // Check if DataIndex is enabled in the platform status
+       if plf.Status.ClusterPlatformRef != nil && 
plf.Status.ClusterPlatformRef.Services != nil && 
plf.Status.ClusterPlatformRef.Services.DataIndexRef != nil && 
len(plf.Status.ClusterPlatformRef.Services.DataIndexRef.Url) > 0 {
+               return true
+       }
+       return false
+}
+
+func IsJobServiceEnabled(plf *operatorapi.SonataFlowPlatform) bool {
+       if plf.Spec.Services != nil {
+               if plf.Spec.Services.JobService != nil {
+                       return 
pointer.BoolDeref(plf.Spec.Services.JobService.Enabled, false)
+               }
+               return false
+       }
+       // Check if JobService is enabled in the platform status
+       if plf.Status.ClusterPlatformRef != nil && 
plf.Status.ClusterPlatformRef.Services != nil && 
plf.Status.ClusterPlatformRef.Services.JobServiceRef != nil && 
len(plf.Status.ClusterPlatformRef.Services.JobServiceRef.Url) > 0 {
+               return true
+       }
+       return false

Review Comment:
   Can you please move the functions `IsDataIndexEnabled` and 
`IsJobServiceEnabled` to the `controllers/platform` module?



##########
controllers/profiles/common/mutate_visitors.go:
##########
@@ -154,7 +180,7 @@ func ServiceMutateVisitor(workflow *operatorapi.SonataFlow) 
MutateVisitor {
        }
 }
 
-func ManagedPropertiesMutateVisitor(ctx context.Context, catalog 
discovery.ServiceCatalog,
+func ManagedPropertiesMutateVisitor(ctx context.Context, c client.Client, 
catalog discovery.ServiceCatalog,

Review Comment:
   Don't need to pass the `client` around.



##########
controllers/knative/knative.go:
##########
@@ -84,3 +102,100 @@ func GetKnativeAvailability(cfg *rest.Config) 
(*Availability, error) {
                return result, nil
        }
 }
+
+func GetWorkflowSink(ctx context.Context, c client.Client, workflow 
*operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform) 
(*duckv1.Destination, error) {
+       if workflow == nil {
+               return nil, nil
+       }
+       if workflow.Spec.Sink != nil {
+               return workflow.Spec.Sink, nil
+       }
+       if pl != nil {
+               if pl.Spec.Eventing != nil {
+                       // no sink defined in the workflow, use the platform 
broker
+                       return pl.Spec.Eventing.Broker, nil
+               } else if pl.Status.ClusterPlatformRef != nil {
+                       // Find the platform referred by the cluster platform
+                       platform := &operatorapi.SonataFlowPlatform{}
+                       if err := c.Get(ctx, types.NamespacedName{Namespace: 
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name: 
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+                               return nil, fmt.Errorf("error reading the 
platform referred by the cluster platform")
+                       }
+                       if platform.Spec.Eventing != nil {
+                               return platform.Spec.Eventing.Broker, nil
+                       }
+               }
+       }
+       return nil, nil
+}
+
+const knativeBrokerAnnotation = "eventing.knative.dev/broker.class"
+
+func GetKnativeResource(ctx context.Context, cfg *rest.Config, kRef 
*duckv1.KReference) (*unstructured.Unstructured, error) {
+       dynamicClient, err := dynamic.NewForConfig(cfg)
+       if err != nil {
+               return nil, err
+       }
+       gv, err := schema.ParseGroupVersion(kRef.APIVersion)
+       if err != nil {
+               return nil, err
+       }
+       resourceId := schema.GroupVersionResource{
+               Group:    gv.Group,
+               Version:  gv.Version,
+               Resource: kRef.Kind,
+       }
+       if len(kRef.Namespace) == 0 {
+               return nil, fmt.Errorf("namespace for knative resource %s is 
missing", kRef.Name)
+       }
+       list, err := 
dynamicClient.Resource(resourceId).Namespace(kRef.Namespace).List(ctx, 
metav1.ListOptions{})
+       fmt.Printf("list:%v", list)
+       return 
dynamicClient.Resource(resourceId).Namespace(kRef.Namespace).Get(ctx, 
kRef.Name, metav1.GetOptions{})
+}
+
+func IsKnativeBroker(kRef *duckv1.KReference) bool {
+       return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind == 
"Broker"
+}
+
+func IsKnativeEnvInjected(ctx context.Context, c client.Client, 
deploymentName, namespace string) (bool, error) {

Review Comment:
   You don't need to pass `Client` over, you can use this: 
https://github.com/apache/incubator-kie-kogito-serverless-operator/blob/main/utils/client.go#L25



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to