ricardozanini commented on code in PR #467:
URL: 
https://github.com/apache/incubator-kie-kogito-serverless-operator/pull/467#discussion_r1624932800


##########
controllers/knative/knative.go:
##########
@@ -64,8 +81,23 @@ func NewKnativeEventingClient(cfg *rest.Config) 
(*clienteventingv1.EventingV1Cli
        return clienteventingv1.NewForConfig(cfg)
 }
 
+func GetDisvoveryClient(cfg *rest.Config) (discovery.DiscoveryInterface, 
error) {
+       if discoveryClient == nil {
+               if cli, err := discovery.NewDiscoveryClientForConfig(cfg); err 
!= nil {
+                       return nil, err
+               } else {
+                       discoveryClient = cli
+               }
+       }
+       return discoveryClient, nil
+}
+
+func SetDisvoveryClient(cli discovery.DiscoveryInterface) {

Review Comment:
   ```suggestion
   func SetDiscoveryClient(cli discovery.DiscoveryInterface) {
   ```



##########
controllers/knative/knative.go:
##########
@@ -84,3 +116,95 @@ func GetKnativeAvailability(cfg *rest.Config) 
(*Availability, error) {
                return result, nil
        }
 }
+
+func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl 
*operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) {
+       if workflow == nil {
+               return nil, nil
+       }
+       if workflow.Spec.Sink != nil {
+               return workflow.Spec.Sink, nil
+       }
+       if pl != nil && pl.Spec.Eventing != nil {
+               // no sink defined in the workflow, use the platform broker
+               return pl.Spec.Eventing.Broker, nil
+       } else if pl.Status.ClusterPlatformRef != nil {
+               // Find the platform referred by the cluster platform
+               platform := &operatorapi.SonataFlowPlatform{}
+               if err := utils.GetClient().Get(context.TODO(), 
types.NamespacedName{Namespace: 
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name: 
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+                       return nil, fmt.Errorf("error reading the platform 
referred by the cluster platform")
+               }
+               if platform.Spec.Eventing != nil {
+                       return platform.Spec.Eventing.Broker, nil
+               }
+       }
+       return nil, nil
+}
+
+func IsKnativeBroker(kRef *duckv1.KReference) bool {
+       return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind == 
"Broker"

Review Comment:
   can you please move these to private `const`s?



##########
controllers/profiles/dev/states_dev.go:
##########
@@ -117,7 +117,7 @@ func (e *ensureRunningWorkflowState) Do(ctx 
context.Context, workflow *operatora
        }
        objs = append(objs, route)
 
-       if knativeObjs, err := 
common.NewKnativeEventingHandler(e.StateSupport).Ensure(ctx, workflow); err != 
nil {
+       if knativeObjs, err := common.NewKnativeEventingHandler(e.StateSupport, 
pl).Ensure(ctx, workflow); err != nil {

Review Comment:
   Do you need to pass `StateSuport` here?



##########
controllers/knative/knative.go:
##########
@@ -64,8 +81,23 @@ func NewKnativeEventingClient(cfg *rest.Config) 
(*clienteventingv1.EventingV1Cli
        return clienteventingv1.NewForConfig(cfg)
 }
 
+func GetDisvoveryClient(cfg *rest.Config) (discovery.DiscoveryInterface, 
error) {

Review Comment:
   I'm not sure if this is the right place for this function since it's not 
related directly to Knative. We have two options: either move it to the 
`utils/kubernetes` package or make it private if only Knative is using it. But 
IIRC, our service discovery use cases also use it.



##########
api/condition_types.go:
##########
@@ -39,6 +39,8 @@ const (
        SucceedConditionType ConditionType = "Succeed"
        // BuiltConditionType describes the condition of a resource that needs 
to be build.
        BuiltConditionType ConditionType = "Built"
+       // DeployedConditionType describes the condition of a resource that 
needs to be deployed.
+       DeployedConditionType ConditionType = "Deployed"

Review Comment:
   I'd avoid adding one more condition to the status since it requires a whole 
new status handling in the controller reconciliation algorithm.
   
   We do the reconciliation based on the status in the `CanReconcile`. By 
adding an additional condition type we will have then to consider it in every 
cycle to avoid missing a combination of status and not doing a reconciliation 
whatsoever for a given object. 



##########
controllers/knative/knative.go:
##########
@@ -84,3 +116,95 @@ func GetKnativeAvailability(cfg *rest.Config) 
(*Availability, error) {
                return result, nil
        }
 }
+
+func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl 
*operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) {
+       if workflow == nil {
+               return nil, nil
+       }
+       if workflow.Spec.Sink != nil {
+               return workflow.Spec.Sink, nil
+       }
+       if pl != nil && pl.Spec.Eventing != nil {
+               // no sink defined in the workflow, use the platform broker
+               return pl.Spec.Eventing.Broker, nil
+       } else if pl.Status.ClusterPlatformRef != nil {
+               // Find the platform referred by the cluster platform
+               platform := &operatorapi.SonataFlowPlatform{}
+               if err := utils.GetClient().Get(context.TODO(), 
types.NamespacedName{Namespace: 
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name: 
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+                       return nil, fmt.Errorf("error reading the platform 
referred by the cluster platform")
+               }
+               if platform.Spec.Eventing != nil {
+                       return platform.Spec.Eventing.Broker, nil
+               }
+       }
+       return nil, nil
+}
+
+func IsKnativeBroker(kRef *duckv1.KReference) bool {
+       return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind == 
"Broker"
+}
+
+func SaveKnativeData(dest *corev1.PodSpec, source *corev1.PodSpec) {
+       for _, volume := range source.Volumes {
+               if volume.Name == KnativeBundleVolume {
+                       kubeutil.AddOrReplaceVolume(dest, volume)
+                       break
+               }
+       }
+       visitContainers(source, func(container *corev1.Container) {
+               visitContainers(dest, func(destContainer *corev1.Container) {
+                       for _, mount := range container.VolumeMounts {
+                               if mount.Name == KnativeBundleVolume {
+                                       
kubeutil.AddOrReplaceVolumeMount(destContainer, mount)
+                                       break
+                               }
+                       }
+                       for _, env := range container.Env {
+                               if env.Name == KSink || env.Name == 
KCeOverRides {
+                                       
kubeutil.AddOrReplaceEnvVar(destContainer, env)
+                               }
+                       }
+               })
+       })
+}
+
+func moveKnativeVolumeToEnd(vols []corev1.Volume) {
+       for i := 0; i < len(vols)-1; i++ {
+               if vols[i].Name == KnativeBundleVolume {
+                       vols[i], vols[i+1] = vols[i+1], vols[i]
+               }
+       }
+}
+
+func moveKnativeVolumeMountToEnd(mounts []corev1.VolumeMount) {
+       for i := 0; i < len(mounts)-1; i++ {
+               if mounts[i].Name == KnativeBundleVolume {
+                       mounts[i], mounts[i+1] = mounts[i+1], mounts[i]
+               }
+       }
+}
+
+// Knative Sinkbinding injects K_SINK env, a volume and volumn mount. The 
volume and volume mount

Review Comment:
   ```suggestion
   // Knative Sinkbinding injects K_SINK env, a volume and volume mount. The 
volume and volume mount
   ```



##########
controllers/knative/knative.go:
##########
@@ -64,8 +81,23 @@ func NewKnativeEventingClient(cfg *rest.Config) 
(*clienteventingv1.EventingV1Cli
        return clienteventingv1.NewForConfig(cfg)
 }
 
+func GetDisvoveryClient(cfg *rest.Config) (discovery.DiscoveryInterface, 
error) {
+       if discoveryClient == nil {
+               if cli, err := discovery.NewDiscoveryClientForConfig(cfg); err 
!= nil {
+                       return nil, err
+               } else {
+                       discoveryClient = cli
+               }
+       }
+       return discoveryClient, nil
+}
+
+func SetDisvoveryClient(cli discovery.DiscoveryInterface) {
+       discoveryClient = cli
+}
+
 func GetKnativeAvailability(cfg *rest.Config) (*Availability, error) {
-       if cli, err := discovery.NewDiscoveryClientForConfig(cfg); err != nil {
+       if cli, err := GetDisvoveryClient(cfg); err != nil {

Review Comment:
   ```suggestion
        if cli, err := GetDiscoveryClient(cfg); err != nil {
   ```



##########
controllers/knative/knative.go:
##########
@@ -20,20 +20,37 @@
 package knative
 
 import (
+       "context"
+       "fmt"
+
+       operatorapi 
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+       "github.com/apache/incubator-kie-kogito-serverless-operator/utils"
+       kubeutil 
"github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
+       appsv1 "k8s.io/api/apps/v1"
+       corev1 "k8s.io/api/core/v1"
+       "k8s.io/apimachinery/pkg/types"
        "k8s.io/client-go/discovery"
        "k8s.io/client-go/rest"
        clienteventingv1 
"knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
+       duckv1 "knative.dev/pkg/apis/duck/v1"
        clientservingv1 
"knative.dev/serving/pkg/client/clientset/versioned/typed/serving/v1"
 )
 
 var servingClient clientservingv1.ServingV1Interface
 var eventingClient clienteventingv1.EventingV1Interface
+var discoveryClient discovery.DiscoveryInterface
 
 type Availability struct {
        Eventing bool
        Serving  bool
 }
 
+const (
+       KSink               = "K_SINK"
+       KnativeBundleVolume = "kne-bundle-volume"
+       KCeOverRides        = "K_CE_OVERRIDES"
+)

Review Comment:
   Are we using this publicly somewhere else?



##########
controllers/profiles/common/constants/platform_services.go:
##########
@@ -20,21 +20,29 @@
 package constants
 
 const (
-       QuarkusHTTP = "quarkus-http"
-
+       QuarkusHTTP                      = "quarkus-http"
+       Post                             = "POST"
        ConfigMapWorkflowPropsVolumeName = "workflow-properties"
 
-       JobServiceRequestEventsURL       = 
"mp.messaging.outgoing.kogito-job-service-job-request-events.url"
-       JobServiceRequestEventsConnector = 
"mp.messaging.outgoing.kogito-job-service-job-request-events.connector"
-       JobServiceStatusChangeEvents     = 
"kogito.jobs-service.http.job-status-change-events"
-       JobServiceStatusChangeEventsURL  = 
"mp.messaging.outgoing.kogito-job-service-job-status-events-http.url"
-       JobServiceURLProtocol            = "http"
-       JobServiceDataSourceReactiveURL  = "quarkus.datasource.reactive.url"
-       JobServiceJobEventsPath          = "/v2/jobs/events"
+       JobServiceRequestEventsURL            = 
"mp.messaging.outgoing.kogito-job-service-job-request-events.url"
+       JobServiceRequestEventsMethod         = 
"mp.messaging.outgoing.kogito-job-service-job-request-events.method"
+       JobServiceRequestEventsConnector      = 
"mp.messaging.outgoing.kogito-job-service-job-request-events.connector"
+       JobServiceStatusChangeEvents          = 
"kogito.jobs-service.http.job-status-change-events"
+       JobServiceStatusChangeEventsURL       = 
"mp.messaging.outgoing.kogito-job-service-job-status-events-http.url"
+       JobServiceStatusChangeEventsConnector = 
"mp.messaging.outgoing.kogito-job-service-job-status-events-http.connector"
+       JobServiceStatusChangeEventsMethod    = 
"mp.messaging.outgoing.kogito-job-service-job-status-events-http.method"
+       JobServiceURLProtocol                 = "http"
+       JobServiceDataSourceReactiveURL       = 
"quarkus.datasource.reactive.url"
+       JobServiceJobEventsPath               = "/v2/jobs/events"

Review Comment:
   You have this value being used in the file above.



##########
controllers/platform/services/knative.go:
##########
@@ -0,0 +1,48 @@
+// Copyright 2024 Apache Software Foundation (ASF)
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package services

Review Comment:
   Can you please move this functions to `services.go`? I don't see Knative 
related code in these functions.



##########
controllers/knative/knative.go:
##########
@@ -84,3 +116,95 @@ func GetKnativeAvailability(cfg *rest.Config) 
(*Availability, error) {
                return result, nil
        }
 }
+
+func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl 
*operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) {
+       if workflow == nil {
+               return nil, nil
+       }
+       if workflow.Spec.Sink != nil {
+               return workflow.Spec.Sink, nil
+       }
+       if pl != nil && pl.Spec.Eventing != nil {
+               // no sink defined in the workflow, use the platform broker
+               return pl.Spec.Eventing.Broker, nil
+       } else if pl.Status.ClusterPlatformRef != nil {
+               // Find the platform referred by the cluster platform
+               platform := &operatorapi.SonataFlowPlatform{}
+               if err := utils.GetClient().Get(context.TODO(), 
types.NamespacedName{Namespace: 
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name: 
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+                       return nil, fmt.Errorf("error reading the platform 
referred by the cluster platform")
+               }
+               if platform.Spec.Eventing != nil {
+                       return platform.Spec.Eventing.Broker, nil
+               }
+       }
+       return nil, nil
+}
+
+func IsKnativeBroker(kRef *duckv1.KReference) bool {
+       return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind == 
"Broker"
+}
+
+func SaveKnativeData(dest *corev1.PodSpec, source *corev1.PodSpec) {
+       for _, volume := range source.Volumes {
+               if volume.Name == KnativeBundleVolume {
+                       kubeutil.AddOrReplaceVolume(dest, volume)
+                       break
+               }
+       }
+       visitContainers(source, func(container *corev1.Container) {
+               visitContainers(dest, func(destContainer *corev1.Container) {
+                       for _, mount := range container.VolumeMounts {
+                               if mount.Name == KnativeBundleVolume {
+                                       
kubeutil.AddOrReplaceVolumeMount(destContainer, mount)
+                                       break
+                               }
+                       }
+                       for _, env := range container.Env {
+                               if env.Name == KSink || env.Name == 
KCeOverRides {
+                                       
kubeutil.AddOrReplaceEnvVar(destContainer, env)
+                               }
+                       }
+               })
+       })
+}
+
+func moveKnativeVolumeToEnd(vols []corev1.Volume) {
+       for i := 0; i < len(vols)-1; i++ {
+               if vols[i].Name == KnativeBundleVolume {
+                       vols[i], vols[i+1] = vols[i+1], vols[i]
+               }
+       }
+}
+
+func moveKnativeVolumeMountToEnd(mounts []corev1.VolumeMount) {
+       for i := 0; i < len(mounts)-1; i++ {
+               if mounts[i].Name == KnativeBundleVolume {
+                       mounts[i], mounts[i+1] = mounts[i+1], mounts[i]
+               }
+       }
+}
+
+// Knative Sinkbinding injects K_SINK env, a volume and volumn mount. The 
volume and volume mount
+// must be in the end of the array to avoid repeadly restarting of the 
workflow pod
+func RestoreKnativeVolumeAndVolumeMount(deployment *appsv1.Deployment) {
+       moveKnativeVolumeToEnd(deployment.Spec.Template.Spec.Volumes)
+       visitContainers(&deployment.Spec.Template.Spec, func(container 
*corev1.Container) {
+               moveKnativeVolumeMountToEnd(container.VolumeMounts)
+       })
+}
+
+// ContainerVisitor is called with each container
+type ContainerVisitor func(container *corev1.Container)

Review Comment:
   Can you make this type private since it seems to be used only by 
`visitContainers`? Nice implementation btw! 👍 



##########
controllers/platform/services/services.go:
##########
@@ -505,3 +552,167 @@ func mergeContainerPreservingEnvVars(dest 
*corev1.Container, source *corev1.Cont
        }
        return nil
 }
+
+// GetPlatformBroker gets the default broker for the platform.
+func GetPlatformBroker(platform *operatorapi.SonataFlowPlatform) 
*duckv1.Destination {
+       if platform != nil && platform.Spec.Eventing != nil && 
platform.Spec.Eventing.Broker != nil {
+               return platform.Spec.Eventing.Broker
+       }
+       return nil
+}
+
+func (d *DataIndexHandler) GetSourceBroker() *duckv1.Destination {
+       if d.platform != nil && d.platform.Spec.Services.DataIndex.Source != 
nil && d.platform.Spec.Services.DataIndex.Source.Ref != nil {
+               return d.platform.Spec.Services.DataIndex.Source
+       }
+       return GetPlatformBroker(d.platform)
+}
+
+func (d *DataIndexHandler) newTrigger(labels map[string]string, brokerName, 
namespace, platformName, serviceName, tag, eventType, path string) 
*eventingv1.Trigger {
+       return &eventingv1.Trigger{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      fmt.Sprintf("%s-data-index-%s-trigger", 
platformName, tag),
+                       Namespace: namespace,
+                       Labels:    labels,
+               },
+               Spec: eventingv1.TriggerSpec{
+                       Broker: brokerName,
+                       Filter: &eventingv1.TriggerFilter{
+                               Attributes: eventingv1.TriggerFilterAttributes{
+                                       "type": eventType,
+                               },
+                       },
+                       Subscriber: duckv1.Destination{
+                               Ref: &duckv1.KReference{
+                                       Name:       serviceName,
+                                       Namespace:  namespace,
+                                       APIVersion: "v1",
+                                       Kind:       "Service",
+                               },
+                               URI: &apis.URL{
+                                       Path: path,
+                               },
+                       },
+               },
+       }
+}
+func (d *DataIndexHandler) GenerateKnativeResources(platform 
*operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, 
error) {
+       broker := d.GetSourceBroker()
+       if broker == nil {
+               return nil, nil // Nothing to do
+       }
+       brokerName := broker.Ref.Name
+       namespace := platform.Namespace
+       serviceName := d.GetServiceName()
+       return []client.Object{
+               d.newTrigger(lbl, brokerName, namespace, platform.Name, 
serviceName, "process-error", "ProcessInstanceErrorDataEvent", pathProcesses),
+               d.newTrigger(lbl, brokerName, namespace, platform.Name, 
serviceName, "process-node", "ProcessInstanceNodeDataEvent", pathProcesses),
+               d.newTrigger(lbl, brokerName, namespace, platform.Name, 
serviceName, "process-sla", "ProcessInstanceSLADataEvent", pathProcesses),
+               d.newTrigger(lbl, brokerName, namespace, platform.Name, 
serviceName, "process-state", "ProcessInstanceStateDataEvent", pathProcesses),
+               d.newTrigger(lbl, brokerName, namespace, platform.Name, 
serviceName, "process-variable", "ProcessInstanceVariableDataEvent", 
pathProcesses),
+               d.newTrigger(lbl, brokerName, namespace, platform.Name, 
serviceName, "process-definition", "ProcessDefinitionEvent", pathDefinitions),
+               d.newTrigger(lbl, brokerName, namespace, platform.Name, 
serviceName, "jobs", "JobEvent", pathJobs)}, nil
+}
+
+func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
+       if d.platform.Spec.Services.JobService.Source != nil && 
d.platform.Spec.Services.JobService.Source.Ref != nil {
+               return d.platform.Spec.Services.JobService.Source
+       }
+       return GetPlatformBroker(d.platform)
+}
+
+func (d JobServiceHandler) GetSink() *duckv1.Destination {
+       if d.platform.Spec.Services.JobService.Sink != nil {
+               return d.platform.Spec.Services.JobService.Sink
+       }
+       return GetPlatformBroker(d.platform)
+}
+
+func (j *JobServiceHandler) GenerateKnativeResources(platform 
*operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, 
error) {
+       broker := j.GetSourceBroker()
+       sink := j.GetSink()
+       namespace := platform.Namespace
+       resultObjs := []client.Object{}
+
+       if broker != nil {
+               brokerName := broker.Ref.Name
+               jobCreateTrigger := &eventingv1.Trigger{
+                       ObjectMeta: metav1.ObjectMeta{
+                               Name:      
fmt.Sprintf("%s-jobs-service-create-job-trigger", platform.Name),
+                               Namespace: namespace,
+                               Labels:    lbl,
+                       },
+                       Spec: eventingv1.TriggerSpec{
+                               Broker: brokerName,
+                               Filter: &eventingv1.TriggerFilter{
+                                       Attributes: 
eventingv1.TriggerFilterAttributes{
+                                               "type": "job.create",
+                                       },
+                               },
+                               Subscriber: duckv1.Destination{
+                                       Ref: &duckv1.KReference{
+                                               Name:       j.GetServiceName(),
+                                               Namespace:  namespace,
+                                               APIVersion: "v1",
+                                               Kind:       "Service",
+                                       },
+                                       URI: &apis.URL{
+                                               Path: "/v2/jobs/events",

Review Comment:
   Can you move this to a private const?



##########
controllers/knative/knative.go:
##########
@@ -84,3 +116,95 @@ func GetKnativeAvailability(cfg *rest.Config) 
(*Availability, error) {
                return result, nil
        }
 }
+
+func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl 
*operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) {
+       if workflow == nil {
+               return nil, nil
+       }
+       if workflow.Spec.Sink != nil {
+               return workflow.Spec.Sink, nil
+       }
+       if pl != nil && pl.Spec.Eventing != nil {
+               // no sink defined in the workflow, use the platform broker
+               return pl.Spec.Eventing.Broker, nil
+       } else if pl.Status.ClusterPlatformRef != nil {
+               // Find the platform referred by the cluster platform
+               platform := &operatorapi.SonataFlowPlatform{}
+               if err := utils.GetClient().Get(context.TODO(), 
types.NamespacedName{Namespace: 
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name: 
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+                       return nil, fmt.Errorf("error reading the platform 
referred by the cluster platform")

Review Comment:
   Don't we have a function doing this in the `controllers/platform` module? If 
not, I think we can add it there.



##########
controllers/profiles/common/constants/platform_services.go:
##########
@@ -20,21 +20,29 @@
 package constants
 
 const (
-       QuarkusHTTP = "quarkus-http"
-
+       QuarkusHTTP                      = "quarkus-http"
+       Post                             = "POST"
        ConfigMapWorkflowPropsVolumeName = "workflow-properties"
 
-       JobServiceRequestEventsURL       = 
"mp.messaging.outgoing.kogito-job-service-job-request-events.url"
-       JobServiceRequestEventsConnector = 
"mp.messaging.outgoing.kogito-job-service-job-request-events.connector"
-       JobServiceStatusChangeEvents     = 
"kogito.jobs-service.http.job-status-change-events"
-       JobServiceStatusChangeEventsURL  = 
"mp.messaging.outgoing.kogito-job-service-job-status-events-http.url"
-       JobServiceURLProtocol            = "http"
-       JobServiceDataSourceReactiveURL  = "quarkus.datasource.reactive.url"
-       JobServiceJobEventsPath          = "/v2/jobs/events"
+       JobServiceRequestEventsURL            = 
"mp.messaging.outgoing.kogito-job-service-job-request-events.url"
+       JobServiceRequestEventsMethod         = 
"mp.messaging.outgoing.kogito-job-service-job-request-events.method"
+       JobServiceRequestEventsConnector      = 
"mp.messaging.outgoing.kogito-job-service-job-request-events.connector"
+       JobServiceStatusChangeEvents          = 
"kogito.jobs-service.http.job-status-change-events"
+       JobServiceStatusChangeEventsURL       = 
"mp.messaging.outgoing.kogito-job-service-job-status-events-http.url"
+       JobServiceStatusChangeEventsConnector = 
"mp.messaging.outgoing.kogito-job-service-job-status-events-http.connector"
+       JobServiceStatusChangeEventsMethod    = 
"mp.messaging.outgoing.kogito-job-service-job-status-events-http.method"
+       JobServiceURLProtocol                 = "http"
+       JobServiceDataSourceReactiveURL       = 
"quarkus.datasource.reactive.url"
+       JobServiceJobEventsPath               = "/v2/jobs/events"
 
+       KogitoProcessEventsProtocol                 = "http"

Review Comment:
   Why not rename it to `DefaultHTTPProtocol` and have one const only? This 
seems the same as `JobServiceURLProtocol`.



##########
controllers/profiles/dev/object_creators_dev.go:
##########
@@ -95,7 +96,7 @@ func deploymentMutateVisitor(workflow 
*operatorapi.SonataFlow, plf *operatorapi.
        }
 }
 
-func ensureWorkflowDefConfigMapMutator(workflow *operatorapi.SonataFlow) 
common.MutateVisitor {
+func ensureWorkflowDefConfigMapMutator(workflow *operatorapi.SonataFlow, 
support *common.StateSupport) common.MutateVisitor {

Review Comment:
   ```suggestion
   func ensureWorkflowDefConfigMapMutator(workflow *operatorapi.SonataFlow) 
common.MutateVisitor {
   ```
   
   I think we don't need this attribute anymore.



##########
controllers/profiles/common/ensurer.go:
##########
@@ -176,6 +224,11 @@ func ensureObject(ctx context.Context, workflow 
*operatorapi.SonataFlow, visitor
                                        return visitorErr
                                }
                        }
+                       if workflow.Namespace != object.GetNamespace() {
+                               // This is for Knative trigger in a different 
namespace
+                               // Set the finalizer for trigger cleanup when 
the workflow is deleted
+                               return setWorkflowFinalizer(ctx, c, workflow)

Review Comment:
   Can you please do another check to verify if this object is a knative one?



##########
controllers/profiles/dev/profile_dev_test.go:
##########
@@ -391,6 +404,7 @@ func Test_VolumeWithCapitalizedPaths(t *testing.T) {
        workflow := 
test.GetSonataFlow(test.SonataFlowGreetingsWithStaticResourcesCR, t.Name())
 
        client := 
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow, 
configMap).WithStatusSubresource(workflow, configMap).Build()
+       knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient())

Review Comment:
   This naming will be replaced once you accept my changes. 
`SetDiscoveryClient`.



##########
controllers/profiles/common/mutate_visitors.go:
##########
@@ -134,7 +138,7 @@ func EnsureKService(original *servingv1.Service, object 
*servingv1.Service) erro
        }
 
        // we do a merge to not keep changing the spec since k8s will set 
default values to the podSpec
-       return mergo.Merge(&object.Spec.Template.Spec.PodSpec, 
original.Spec.Template.Spec.PodSpec, mergo.WithOverride)
+       return mergo.Merge(&object.Spec.Template.Spec.PodSpec, 
original.Spec.Template.Spec.PodSpec /*, mergo.WithOverride*/)

Review Comment:
   Can you remove this comment? I'm assuming this is not required anymore.



##########
controllers/knative/knative.go:
##########
@@ -64,8 +81,23 @@ func NewKnativeEventingClient(cfg *rest.Config) 
(*clienteventingv1.EventingV1Cli
        return clienteventingv1.NewForConfig(cfg)
 }
 
+func GetDisvoveryClient(cfg *rest.Config) (discovery.DiscoveryInterface, 
error) {

Review Comment:
   ```suggestion
   func GetDiscoveryClient(cfg *rest.Config) (discovery.DiscoveryInterface, 
error) {
   ```



##########
controllers/profiles/dev/states_dev.go:
##########
@@ -62,7 +62,7 @@ func (e *ensureRunningWorkflowState) CanReconcile(workflow 
*operatorapi.SonataFl
 func (e *ensureRunningWorkflowState) Do(ctx context.Context, workflow 
*operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) {
        var objs []client.Object
 
-       flowDefCM, _, err := e.ensurers.definitionConfigMap.Ensure(ctx, 
workflow, ensureWorkflowDefConfigMapMutator(workflow))
+       flowDefCM, _, err := e.ensurers.definitionConfigMap.Ensure(ctx, 
workflow, ensureWorkflowDefConfigMapMutator(workflow, e.StateSupport))

Review Comment:
   ```suggestion
        flowDefCM, _, err := e.ensurers.definitionConfigMap.Ensure(ctx, 
workflow, ensureWorkflowDefConfigMapMutator(workflow))
   ```



##########
controllers/profiles/preview/states_preview.go:
##########
@@ -210,7 +211,15 @@ func (h *deployWithBuildWorkflowState) Do(ctx 
context.Context, workflow *operato
        }
 
        // didn't change, business as usual
-       return NewDeploymentReconciler(h.StateSupport, 
h.ensurers).reconcileWithImage(ctx, workflow, build.Status.ImageTag)
+       result, objs, err := NewDeploymentReconciler(h.StateSupport, 
h.ensurers).reconcileWithImage(ctx, workflow, build.Status.ImageTag)
+       if err != nil {
+               workflow.Status.Manager().MarkFalse(api.DeployedConditionType, 
api.DeploymentFailureReason, fmt.Sprintf("Error in deploy the workflow:%s", 
err))
+               _, err = h.PerformStatusUpdate(ctx, workflow)
+               return result, nil, err
+       }
+       workflow.Status.Manager().MarkTrue(api.DeployedConditionType)

Review Comment:
   So this is the only place we set this condition to `true`. Can't we keep 
using `Running`? Otherwise, if we were to introduce this new status, we need to 
make sure that we set it accordingly in every state.
   
   | State | Status | Description
   |------|--------|------------|
   |Running | TRUE | The workflow is running 
   |Deployed | TRUE | The workflow has been deployed
   |Built | FALSE | The workflow has not being built
   
   So, we need to justify and think about this matrix. If it's not deployed, 
hence not running, right? That's what I meant by introducing additional 
complexity to the algorithm by adding a new condition.



##########
controllers/profiles/common/mutate_visitors.go:
##########
@@ -96,8 +96,12 @@ func EnsureDeployment(original *appsv1.Deployment, object 
*appsv1.Deployment) er
        object.Spec.Replicas = original.Spec.Replicas
        object.Spec.Selector = original.Spec.Selector
        object.Labels = original.GetLabels()
+       object.Finalizers = original.Finalizers
 
        // Clean up the volumes, they are inherited from original, additional 
are added by other visitors
+       // However, the knative data (voulmes, volumes mounts) must be preserved

Review Comment:
   ```suggestion
        // However, the knative data (volumes, volumes mounts) must be preserved
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to