This is an automated email from the ASF dual-hosted git repository.

ricardozanini pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-tools.git


The following commit(s) were added to refs/heads/main by this push:
     new f0ea73c3c95 kie-tools-2910: Update workflow metadata upon deployment 
events (#2911)
f0ea73c3c95 is described below

commit f0ea73c3c95e5546be5aa5ddfb83c27fc14a8558
Author: Walter Medvedeo <[email protected]>
AuthorDate: Tue Mar 25 16:48:20 2025 +0100

    kie-tools-2910: Update workflow metadata upon deployment events (#2911)
---
 .../api/v1alpha08/sonataflow_types.go              |   8 ++
 .../api/v1alpha08/zz_generated.deepcopy.go         |   8 ++
 packages/sonataflow-operator/cmd/main.go           |   7 ++
 .../sonataflow-operator.clusterserviceversion.yaml |   8 ++
 packages/sonataflow-operator/go.mod                |   2 +-
 .../module.yaml                                    |   4 +
 .../internal/controller/builder/builder.go         |   2 +-
 .../builder/kogitoserverlessbuild_manager.go       |   2 +-
 .../controller/eventing/workflowdef_events.go      |  73 +++++++++++
 .../internal/controller/knative/knative.go         |  24 +++-
 .../internal/controller/platform/platform.go       |  42 ++++---
 .../controller/platform/services/services.go       |   5 +-
 .../profiles/common/constants/reconcile.go         |  11 +-
 .../profiles/common/constants/workflows.go         |  30 ++---
 .../profiles/common/properties/managed.go          |  24 +++-
 .../controller/profiles/common/reconciler.go       |   2 +-
 .../internal/controller/profiles/dev/states_dev.go |   2 +-
 .../profiles/preview/deployment_handler.go         |  85 ++++++++++++-
 .../profiles/preview/deployment_handler_test.go    |   4 +
 .../controller/profiles/preview/states_preview.go  |   4 +-
 .../internal/controller/sonataflow_controller.go   | 136 +++++++++++++++++++--
 .../internal/controller/workflowdef/events.go      |  53 ++++++++
 .../sonataflow-operator/internal/manager/worker.go |  77 ++++++++++++
 packages/sonataflow-operator/operator.yaml         |  10 ++
 packages/sonataflow-operator/utils/events.go       |  65 ++++++++++
 25 files changed, 627 insertions(+), 61 deletions(-)

diff --git a/packages/sonataflow-operator/api/v1alpha08/sonataflow_types.go 
b/packages/sonataflow-operator/api/v1alpha08/sonataflow_types.go
index 7b6091eaf3e..f43d8a11bd2 100644
--- a/packages/sonataflow-operator/api/v1alpha08/sonataflow_types.go
+++ b/packages/sonataflow-operator/api/v1alpha08/sonataflow_types.go
@@ -202,6 +202,14 @@ type SonataFlowStatus struct {
        Triggers []SonataFlowTriggerRef `json:"triggers,omitempty"`
        
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="flowRevision"
        FlowCRC uint32 `json:"flowCRC,omitempty"`
+       
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="finalizerAttempts"
+       FinalizerAttempts int `json:"finalizerAttempts,omitempty"`
+       
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="finalizerSucceed"
+       FinalizerSucceed bool `json:"finalizerSucceed,omitempty"`
+       
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="lastTimeFinalizerAttempt"
+       LastTimeFinalizerAttempt *metav1.Time 
`json:"lastTimeFinalizerAttempt,omitempty"`
+       
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="lastTimeStatusNotified"
+       LastTimeStatusNotified *metav1.Time 
`json:"lastTimeStatusNotified,omitempty"`
 }
 
 // SonataFlowTriggerRef defines a trigger created for the SonataFlow.
diff --git 
a/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go 
b/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go
index 2468c5a57fd..22faee96338 100644
--- a/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go
+++ b/packages/sonataflow-operator/api/v1alpha08/zz_generated.deepcopy.go
@@ -1470,6 +1470,14 @@ func (in *SonataFlowStatus) DeepCopyInto(out 
*SonataFlowStatus) {
                *out = make([]SonataFlowTriggerRef, len(*in))
                copy(*out, *in)
        }
+       if in.LastTimeFinalizerAttempt != nil {
+               in, out := &in.LastTimeFinalizerAttempt, 
&out.LastTimeFinalizerAttempt
+               *out = (*in).DeepCopy()
+       }
+       if in.LastTimeStatusNotified != nil {
+               in, out := &in.LastTimeStatusNotified, 
&out.LastTimeStatusNotified
+               *out = (*in).DeepCopy()
+       }
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new SonataFlowStatus.
diff --git a/packages/sonataflow-operator/cmd/main.go 
b/packages/sonataflow-operator/cmd/main.go
index 0ee30c457ce..5a524863c3e 100644
--- a/packages/sonataflow-operator/cmd/main.go
+++ b/packages/sonataflow-operator/cmd/main.go
@@ -24,6 +24,8 @@ import (
        "flag"
        "os"
 
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/manager"
+
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/version"
 
        prometheus 
"github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
@@ -92,6 +94,8 @@ func main() {
        flag.StringVar(&controllerCfgPath, "controller-cfg-path", "", "The 
controller config file path.")
        flag.Parse()
 
+       manager.SetOperatorStartTime()
+
        ctrl.SetLogger(klogr.New().WithName(controller.ComponentName))
 
        // if the enable-http2 flag is false (the default), http/2 should be 
disabled
@@ -152,6 +156,9 @@ func main() {
                os.Exit(1)
        }
 
+       // Initialize the worker used by the SonataFlow reconciliations to 
execute auxiliary async operations.
+       manager.InitializeSFCWorker(manager.SonataFlowControllerWorkerSize)
+
        if err = (&controller.SonataFlowReconciler{
                Client:   mgr.GetClient(),
                Scheme:   mgr.GetScheme(),
diff --git 
a/packages/sonataflow-operator/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
 
b/packages/sonataflow-operator/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
index 42a6fa178cd..228f10bd633 100644
--- 
a/packages/sonataflow-operator/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
+++ 
b/packages/sonataflow-operator/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
@@ -241,10 +241,18 @@ spec:
           - description: Endpoint is an externally accessible URL of the 
workflow
             displayName: endpoint
             path: endpoint
+          - displayName: finalizerAttempts
+            path: finalizerAttempts
+          - displayName: finalizerSucceed
+            path: finalizerSucceed
           - displayName: flowRevision
             path: flowCRC
+          - displayName: lastTimeFinalizerAttempt
+            path: lastTimeFinalizerAttempt
           - displayName: lastTimeRecoverAttempt
             path: lastTimeRecoverAttempt
+          - displayName: lastTimeStatusNotified
+            path: lastTimeStatusNotified
           - description: Platform displays which platform is being used by 
this workflow
             displayName: platform
             path: platform
diff --git a/packages/sonataflow-operator/go.mod 
b/packages/sonataflow-operator/go.mod
index 3840aaa295a..75594886016 100644
--- a/packages/sonataflow-operator/go.mod
+++ b/packages/sonataflow-operator/go.mod
@@ -14,6 +14,7 @@ require (
        github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api 
v0.0.0
        
github.com/apache/incubator-kie-tools/packages/sonataflow-operator/container-builder
 v0.0.0
        
github.com/apache/incubator-kie-tools/packages/sonataflow-operator/workflowproj 
v0.0.0
+       github.com/cloudevents/sdk-go/v2 v2.15.2
        github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
        github.com/go-logr/logr v1.4.2 // indirect
        github.com/imdario/mergo v0.3.16
@@ -47,7 +48,6 @@ require (
        github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
        github.com/cespare/xxhash/v2 v2.3.0 // indirect
        github.com/cloudevents/sdk-go/sql/v2 v2.13.0 // indirect
-       github.com/cloudevents/sdk-go/v2 v2.15.2 // indirect
        github.com/dprotaso/go-yit v0.0.0-20220510233725-9ba8df137936 // 
indirect
        github.com/emicklei/go-restful/v3 v3.11.0 // indirect
        github.com/evanphx/json-patch/v5 v5.9.0 // indirect
diff --git 
a/packages/sonataflow-operator/images/modules/org.apache.kie.sonataflow.operatorBuilder/module.yaml
 
b/packages/sonataflow-operator/images/modules/org.apache.kie.sonataflow.operatorBuilder/module.yaml
index 95f3252a510..49987caa106 100644
--- 
a/packages/sonataflow-operator/images/modules/org.apache.kie.sonataflow.operatorBuilder/module.yaml
+++ 
b/packages/sonataflow-operator/images/modules/org.apache.kie.sonataflow.operatorBuilder/module.yaml
@@ -29,6 +29,10 @@ artifacts:
     path: "../../../../internal/controller"
     dest: /workspace/internal/controller
 
+  - name: manager
+    path: "../../../../internal/manager"
+    dest: /workspace/internal/manager
+
   - name: version
     path: "../../../../version"
     dest: /workspace/version
diff --git 
a/packages/sonataflow-operator/internal/controller/builder/builder.go 
b/packages/sonataflow-operator/internal/controller/builder/builder.go
index 25bad566d4b..a368152c93f 100644
--- a/packages/sonataflow-operator/internal/controller/builder/builder.go
+++ b/packages/sonataflow-operator/internal/controller/builder/builder.go
@@ -47,7 +47,7 @@ type BuildManager interface {
 }
 
 func NewBuildManager(ctx context.Context, client client.Client, cliConfig 
*rest.Config, targetName, targetNamespace string) (BuildManager, error) {
-       p, err := platform.GetActivePlatform(ctx, client, targetNamespace)
+       p, err := platform.GetActivePlatform(ctx, client, targetNamespace, true)
        if err != nil {
                if errors.IsNotFound(err) {
                        return nil, err
diff --git 
a/packages/sonataflow-operator/internal/controller/builder/kogitoserverlessbuild_manager.go
 
b/packages/sonataflow-operator/internal/controller/builder/kogitoserverlessbuild_manager.go
index be67b5a33fc..3387b5a4228 100644
--- 
a/packages/sonataflow-operator/internal/controller/builder/kogitoserverlessbuild_manager.go
+++ 
b/packages/sonataflow-operator/internal/controller/builder/kogitoserverlessbuild_manager.go
@@ -59,7 +59,7 @@ func (k *sonataFlowBuildManager) GetOrCreateBuild(workflow 
*operatorapi.SonataFl
        if err := k.client.Get(k.ctx, client.ObjectKeyFromObject(workflow), 
buildInstance); err != nil {
                if errors.IsNotFound(err) {
                        plat := &operatorapi.SonataFlowPlatform{}
-                       if plat, err = platform.GetActivePlatform(k.ctx, 
k.client, workflow.Namespace); err != nil {
+                       if plat, err = platform.GetActivePlatform(k.ctx, 
k.client, workflow.Namespace, true); err != nil {
                                return nil, err
                        }
                        workflowBuildTemplate := 
plat.Spec.Build.Template.DeepCopy()
diff --git 
a/packages/sonataflow-operator/internal/controller/eventing/workflowdef_events.go
 
b/packages/sonataflow-operator/internal/controller/eventing/workflowdef_events.go
new file mode 100644
index 00000000000..73a1651b541
--- /dev/null
+++ 
b/packages/sonataflow-operator/internal/controller/eventing/workflowdef_events.go
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package eventing
+
+import (
+       "context"
+       "fmt"
+
+       duckv1 "knative.dev/pkg/apis/duck/v1"
+
+       "k8s.io/klog/v2"
+       "sigs.k8s.io/controller-runtime/pkg/client"
+
+       operatorapi 
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/knative"
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform"
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform/services"
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants"
+       "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
+)
+
+// GetWorkflowDefinitionEventsTargetURL returns the target url that must be 
used to send the workflow definition events.
+func GetWorkflowDefinitionEventsTargetURL(cli client.Client, workflow 
*operatorapi.SonataFlow) (string, error) {
+       var err error
+       var sfp *operatorapi.SonataFlowPlatform
+       var sink *duckv1.Destination
+       var uri string
+
+       if sfp, err = platform.GetActivePlatform(context.Background(), cli, 
workflow.Namespace, false); err != nil {
+               return "", fmt.Errorf("failed to get active platform for 
workflow: %s, namespace: %s : %v", workflow.Name, workflow.Namespace, err)
+       }
+       if sfp == nil {
+               klog.V(log.D).Infof("No active platform was found to calculate 
the workflow definition events target url for workflow: %s, namespace: %s.", 
workflow.Name, workflow.Namespace)
+               return "", err
+       }
+       diHandler := services.NewDataIndexHandler(sfp)
+       if !diHandler.IsServiceEnabled() {
+               klog.V(log.D).Infof("DataIndex is not enabled for current 
workflow: %s, namespace: %s, neither in current platform: %s, or by a cluster 
platform reference.", workflow.Name, workflow.Namespace, sfp.Name)
+               return "", nil
+       }
+
+       // First check if the workflow is connected with the knative eventing 
system.
+       if sink, err = knative.GetWorkflowSink(workflow, sfp); err != nil {
+               return "", fmt.Errorf("failed to look for a potential sink 
configuration for workflow: %s, namespace: %s : %v", workflow.Name, 
workflow.Namespace, err)
+       }
+       if sink != nil {
+               // Workflow is connected via with knative eventing by using an 
operator managed SinkBinding.
+               if sinkURI, err := 
knative.GetSinkBindingSinkURI(workflow.Name+"-sb", workflow.Namespace); err != 
nil {
+                       return "", err
+               } else {
+                       uri = sinkURI.String()
+               }
+       } else {
+               // Workflow is connected via direct http invocation with the DI.
+               uri = diHandler.GetServiceBaseUrl() + 
constants.KogitoProcessDefinitionsEventsPath
+       }
+       return uri, nil
+}
diff --git 
a/packages/sonataflow-operator/internal/controller/knative/knative.go 
b/packages/sonataflow-operator/internal/controller/knative/knative.go
index 49f2540b2d4..3886363b696 100644
--- a/packages/sonataflow-operator/internal/controller/knative/knative.go
+++ b/packages/sonataflow-operator/internal/controller/knative/knative.go
@@ -116,8 +116,8 @@ func GetKnativeAvailability(cfg *rest.Config) 
(*Availability, error) {
        }
 }
 
-// getRemotePlatform returns the remote platfrom referred by a 
SonataFlowClusterPlatform
-func getRemotePlatform(pl *operatorapi.SonataFlowPlatform) 
(*operatorapi.SonataFlowPlatform, error) {
+// GetRemotePlatform returns the remote platform referred by a 
SonataFlowClusterPlatform if any.
+func GetRemotePlatform(pl *operatorapi.SonataFlowPlatform) 
(*operatorapi.SonataFlowPlatform, error) {
        if pl.Status.ClusterPlatformRef != nil {
                // Find the platform referred by the cluster platform
                platform := &operatorapi.SonataFlowPlatform{}
@@ -172,12 +172,15 @@ func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl 
*operatorapi.SonataFlo
        if workflow.Spec.Sink != nil {
                return getDestinationWithNamespace(workflow.Spec.Sink, 
workflow.Namespace), nil
        }
-       if pl != nil && pl.Spec.Eventing != nil && pl.Spec.Eventing.Broker != 
nil {
+       if pl == nil {
+               return nil, nil
+       }
+       if pl.Spec.Eventing != nil && pl.Spec.Eventing.Broker != nil {
                // no sink defined in the workflow, use the platform broker
                return getDestinationWithNamespace(pl.Spec.Eventing.Broker, 
pl.Namespace), nil
        }
        // Find the remote platform referred by the cluster platform
-       platform, err := getRemotePlatform(pl)
+       platform, err := GetRemotePlatform(pl)
        if err != nil {
                return nil, err
        }
@@ -290,3 +293,16 @@ func CheckKSinkInjected(name, namespace string) (bool, 
error) {
        }
        return false, nil // K_SINK has not been injected yet
 }
+
+// GetSinkBindingSinkURI returns the address of the sink referred by a 
SinkBinding.
+func GetSinkBindingSinkURI(name, namespace string) (*apis.URL, error) {
+       sb := &sourcesv1.SinkBinding{}
+       if err := utils.GetClient().Get(context.TODO(), 
types.NamespacedName{Name: name, Namespace: namespace}, sb); err != nil {
+               return nil, err
+       }
+       cond := sb.Status.GetCondition(apis.ConditionType(apis.ConditionReady))
+       if cond == nil || cond.Status != corev1.ConditionTrue {
+               return nil, fmt.Errorf("SinkBinding name: %s, namespace: %s is 
not ready", name, namespace)
+       }
+       return sb.Status.SinkURI, nil
+}
diff --git 
a/packages/sonataflow-operator/internal/controller/platform/platform.go 
b/packages/sonataflow-operator/internal/controller/platform/platform.go
index c98e6ff9b5a..fe36aaea20e 100644
--- a/packages/sonataflow-operator/internal/controller/platform/platform.go
+++ b/packages/sonataflow-operator/internal/controller/platform/platform.go
@@ -119,23 +119,41 @@ func GetOperatorLockName(operatorID string) string {
 }
 
 // GetActivePlatform returns the currently installed active platform in the 
local namespace.
-func GetActivePlatform(ctx context.Context, c ctrl.Client, namespace string) 
(*operatorapi.SonataFlowPlatform, error) {
-       return getLocalPlatform(ctx, c, namespace, true)
+// The parameter createIfNotExists determines if such platform must be created 
when not exists. Never nil when
+// createsIfNotExists is true, unless an error.
+func GetActivePlatform(ctx context.Context, c ctrl.Client, namespace string, 
createIfNotExists bool) (*operatorapi.SonataFlowPlatform, error) {
+       platform, err := getLocalPlatform(ctx, c, namespace, true)
+       if err != nil {
+               return nil, err
+       }
+       if platform != nil {
+               return platform, nil
+       }
+       klog.V(log.I).InfoS("No active SonataFlowPlatform was found in 
namespace", "Namespace", namespace)
+       if createIfNotExists {
+               klog.V(log.I).InfoS("Creating a default SonataFlowPlatform", 
"Namespace", namespace)
+               sfp := newEmptySonataFlowPlatform(namespace)
+               if err = CreateOrUpdateWithDefaults(ctx, sfp, false); err != 
nil {
+                       return nil, err
+               }
+               return sfp, nil
+       }
+       return nil, nil
 }
 
-// getLocalPlatform returns the currently installed platform or any platform 
existing in local namespace.
+// getLocalPlatform returns the currently installed active platform, or any 
platform, existing in local namespace when no
+// active platform exists. When the active parameter is true, only active 
platforms are considered.
+// In other cases, a non-active platform might be returned as a second option.
 func getLocalPlatform(ctx context.Context, c ctrl.Client, namespace string, 
active bool) (*operatorapi.SonataFlowPlatform, error) {
-       klog.V(log.D).InfoS("Finding available platforms")
-
+       klog.V(log.D).InfoS("Finding available platforms in namespace", 
"namespace", namespace)
        lst, err := listPrimaryPlatforms(ctx, c, namespace)
        if err != nil {
                return nil, err
        }
-
        for _, p := range lst.Items {
                platform := p // pin
                if IsActive(&platform) {
-                       klog.V(log.D).InfoS("Found active local build 
platform", "platform", platform.Name)
+                       klog.V(log.D).InfoS("Found active local platform", 
"platform", platform.Name)
                        return &platform, nil
                }
        }
@@ -143,16 +161,10 @@ func getLocalPlatform(ctx context.Context, c ctrl.Client, 
namespace string, acti
        if !active && len(lst.Items) > 0 {
                // does not require the platform to be active, just return one 
if present
                res := lst.Items[0]
-               klog.V(log.D).InfoS("Found local build platform", "platform", 
res.Name)
+               klog.V(log.D).InfoS("Found non-active local platform", 
"platform", res.Name)
                return &res, nil
        }
-       klog.V(log.I).InfoS("Not found a local build platform", "Namespace", 
namespace)
-       klog.V(log.I).InfoS("Creating a default SonataFlowPlatform", 
"Namespace", namespace)
-       sfp := newEmptySonataFlowPlatform(namespace)
-       if err = CreateOrUpdateWithDefaults(ctx, sfp, false); err != nil {
-               return nil, err
-       }
-       return sfp, nil
+       return nil, nil
 }
 
 func newEmptySonataFlowPlatform(namespace string) 
*operatorapi.SonataFlowPlatform {
diff --git 
a/packages/sonataflow-operator/internal/controller/platform/services/services.go
 
b/packages/sonataflow-operator/internal/controller/platform/services/services.go
index 6ef5ac4204a..6ddb220a4c1 100644
--- 
a/packages/sonataflow-operator/internal/controller/platform/services/services.go
+++ 
b/packages/sonataflow-operator/internal/controller/platform/services/services.go
@@ -112,6 +112,9 @@ type PlatformServiceHandler interface {
        // SetServiceUrlInWorkflowStatus sets the service url in a workflow's 
status.
        SetServiceUrlInWorkflowStatus(workflow *operatorapi.SonataFlow)
 
+       // GetServiceSource returns the source Broker configured for the given 
service by applying the following precedence rule.
+       // The source declared in the given service definition is returned 
first, if any, otherwise a source declared in the
+       // service platform is returned, if any.
        GetServiceSource() *duckv1.Destination
 
        // Check if K_SINK has injected for Job Service. No Op for Data Index
@@ -690,7 +693,7 @@ func (d *DataIndexHandler) 
GenerateKnativeResources(platform *operatorapi.Sonata
                d.newTrigger(lbl, annotations, brokerName, namespace, 
serviceName, "process-node", "ProcessInstanceNodeDataEvent", 
constants.KogitoProcessInstancesEventsPath, platform),
                d.newTrigger(lbl, annotations, brokerName, namespace, 
serviceName, "process-state", "ProcessInstanceStateDataEvent", 
constants.KogitoProcessInstancesEventsPath, platform),
                d.newTrigger(lbl, annotations, brokerName, namespace, 
serviceName, "process-variable", "ProcessInstanceVariableDataEvent", 
constants.KogitoProcessInstancesEventsPath, platform),
-               d.newTrigger(lbl, annotations, brokerName, namespace, 
serviceName, "process-definition", "ProcessDefinitionEvent", 
constants.KogitoProcessDefinitionsEventsPath, platform),
+               d.newTrigger(lbl, managedAnnotations, brokerName, namespace, 
serviceName, "process-definition", "ProcessDefinitionEvent", 
constants.KogitoProcessDefinitionsEventsPath, platform),
                d.newTrigger(lbl, annotations, brokerName, namespace, 
serviceName, "process-instance-multiple", "MultipleProcessInstanceDataEvent", 
constants.KogitoProcessInstancesMultiEventsPath, platform),
                d.newTrigger(lbl, managedAnnotations, brokerName, namespace, 
serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil, nil
 }
diff --git 
a/packages/sonataflow-operator/internal/controller/profiles/common/constants/reconcile.go
 
b/packages/sonataflow-operator/internal/controller/profiles/common/constants/reconcile.go
index a2a4a7ac1df..fb86e55b601 100644
--- 
a/packages/sonataflow-operator/internal/controller/profiles/common/constants/reconcile.go
+++ 
b/packages/sonataflow-operator/internal/controller/profiles/common/constants/reconcile.go
@@ -30,6 +30,13 @@ const (
        // RequeueRecoverDeploymentErrorInterval interval between recovering 
from failures
        RequeueRecoverDeploymentErrorInterval = RecoverDeploymentErrorInterval 
* time.Minute
        RecoverDeploymentErrorInterval        = 10
-
-       DefaultHTTPWorkflowPortInt = 8080
+       DefaultHTTPWorkflowPortInt            = 8080
+       // MaxWorkflowFinalizerAttempts how many times the operator will try to 
execute a SonataFlow CRD finalizer.
+       MaxWorkflowFinalizerAttempts = 3
+       // WorkflowFinalizerRetryInterval interval between SonataFlow CRD 
finalizer execution attempts.
+       WorkflowFinalizerRetryInterval = 5 * time.Second
+       // WorkflowFinalizerSchedulingRetryInterval interval for the operator 
to retry to schedule a failing finalizer scheduling.
+       WorkflowFinalizerSchedulingRetryInterval = 5 * time.Second
+       // EventDeliveryTimeout delivery timeout for the cloud events produced 
by the operator.
+       EventDeliveryTimeout = 30 * time.Second
 )
diff --git 
a/packages/sonataflow-operator/internal/controller/profiles/common/constants/workflows.go
 
b/packages/sonataflow-operator/internal/controller/profiles/common/constants/workflows.go
index f0fca0a4b63..11be4c5270b 100644
--- 
a/packages/sonataflow-operator/internal/controller/profiles/common/constants/workflows.go
+++ 
b/packages/sonataflow-operator/internal/controller/profiles/common/constants/workflows.go
@@ -18,18 +18,20 @@
 package constants
 
 const (
-       MicroprofileServiceCatalogPropertyPrefix = 
"org.kie.kogito.addons.discovery."
-       KogitoOutgoingEventsURL                  = 
"mp.messaging.outgoing.kogito_outgoing_stream.url"
-       KogitoOutgoingEventsConnector            = 
"mp.messaging.outgoing.kogito_outgoing_stream.connector"
-       KogitoIncomingEventsConnector            = 
"mp.messaging.incoming.kogito_incoming_stream.connector"
-       KogitoIncomingEventsPath                 = 
"mp.messaging.incoming.kogito_incoming_stream.path"
-       KnativeHealthEnabled                     = 
"org.kie.kogito.addons.knative.eventing.health-enabled"
-       KnativeInjectedEnvVar                    = "${K_SINK}"
-       TriggerFinalizer                         = "trigger-deletion"
-       QuarkusDevUICorsEnabled                  = "quarkus.dev-ui.cors.enabled"
-       QuarkusHttpCors                          = "quarkus.http.cors"
-       QuarkusHttpCorsOrigins                   = "quarkus.http.cors.origins"
-       KogitoEventsGrouping                     = "kogito.events.grouping"
-       KogitoEventsGroupingBinary               = 
"kogito.events.grouping.binary"
-       KogitoEventsGroupingCompress             = 
"kogito.events.grouping.compress"
+       MicroprofileServiceCatalogPropertyPrefix      = 
"org.kie.kogito.addons.discovery."
+       KogitoOutgoingEventsURL                       = 
"mp.messaging.outgoing.kogito_outgoing_stream.url"
+       KogitoOutgoingEventsConnector                 = 
"mp.messaging.outgoing.kogito_outgoing_stream.connector"
+       KogitoIncomingEventsConnector                 = 
"mp.messaging.incoming.kogito_incoming_stream.connector"
+       KogitoIncomingEventsPath                      = 
"mp.messaging.incoming.kogito_incoming_stream.path"
+       KnativeHealthEnabled                          = 
"org.kie.kogito.addons.knative.eventing.health-enabled"
+       KnativeInjectedEnvVar                         = "${K_SINK}"
+       TriggerFinalizer                              = "trigger-deletion"
+       WorkflowFinalizer                             = "workflow-deletion"
+       QuarkusDevUICorsEnabled                       = 
"quarkus.dev-ui.cors.enabled"
+       QuarkusHttpCors                               = "quarkus.http.cors"
+       QuarkusHttpCorsOrigins                        = 
"quarkus.http.cors.origins"
+       KogitoEventsGrouping                          = "kogito.events.grouping"
+       KogitoEventsGroupingBinary                    = 
"kogito.events.grouping.binary"
+       KogitoEventsGroupingCompress                  = 
"kogito.events.grouping.compress"
+       SendWorkflowDefinitionsStatusUpdateEventError = "An error was produced 
while sending a workflow definition status change event."
 )
diff --git 
a/packages/sonataflow-operator/internal/controller/profiles/common/properties/managed.go
 
b/packages/sonataflow-operator/internal/controller/profiles/common/properties/managed.go
index 1587f5440b5..545c2c47019 100644
--- 
a/packages/sonataflow-operator/internal/controller/profiles/common/properties/managed.go
+++ 
b/packages/sonataflow-operator/internal/controller/profiles/common/properties/managed.go
@@ -116,13 +116,27 @@ func (a *managedPropertyHandler) Build() string {
 // withKogitoServiceUrl adds the property kogitoServiceUrlProperty to the 
application properties.
 // See Service Discovery 
https://kubernetes.io/docs/concepts/services-networking/service/#dns
 func (a *managedPropertyHandler) withKogitoServiceUrl() ManagedPropertyHandler 
{
-       var kogitoServiceUrl string
-       if len(a.workflow.Namespace) > 0 {
-               kogitoServiceUrl = fmt.Sprintf("%s://%s.%s", 
constants.DefaultHTTPProtocol, a.workflow.Name, a.workflow.Namespace)
+       return a.addDefaultManagedProperty(constants.KogitoServiceURLProperty, 
GetKogitoServiceUrl(a.workflow))
+}
+
+func GetKogitoServiceUrl(workflow *operatorapi.SonataFlow) string {
+       return GetKogitoServiceUrlWithNameAndNamespace(workflow.Name, 
workflow.Namespace)
+}
+
+func GetKogitoServiceUrlWithNameAndNamespace(name, namespace string) string {
+       if len(namespace) > 0 {
+               return fmt.Sprintf("%s://%s.%s", constants.DefaultHTTPProtocol, 
name, namespace)
        } else {
-               kogitoServiceUrl = fmt.Sprintf("%s://%s", 
constants.DefaultHTTPProtocol, a.workflow.Name)
+               return fmt.Sprintf("%s://%s", constants.DefaultHTTPProtocol, 
name)
        }
-       return a.addDefaultManagedProperty(constants.KogitoServiceURLProperty, 
kogitoServiceUrl)
+}
+
+func GetWorkflowEndpointUrl(workflow *operatorapi.SonataFlow) string {
+       return GetWorkflowEndpointUrlWithNameAndNamespace(workflow.Name, 
workflow.Namespace)
+}
+
+func GetWorkflowEndpointUrlWithNameAndNamespace(name, namespace string) string 
{
+       return GetKogitoServiceUrlWithNameAndNamespace(name, namespace) + "/" + 
name
 }
 
 // withKafkaHealthCheckDisabled adds the property kafkaSmallRyeHealthProperty 
to the application properties.
diff --git 
a/packages/sonataflow-operator/internal/controller/profiles/common/reconciler.go
 
b/packages/sonataflow-operator/internal/controller/profiles/common/reconciler.go
index e3967d8cfc4..93b582a28fd 100644
--- 
a/packages/sonataflow-operator/internal/controller/profiles/common/reconciler.go
+++ 
b/packages/sonataflow-operator/internal/controller/profiles/common/reconciler.go
@@ -53,7 +53,7 @@ type StateSupport struct {
 // PerformStatusUpdate updates the SonataFlow Status conditions
 func (s *StateSupport) PerformStatusUpdate(ctx context.Context, workflow 
*operatorapi.SonataFlow) (bool, error) {
        var err error
-       pl, err := platform.GetActivePlatform(ctx, s.C, workflow.Namespace)
+       pl, err := platform.GetActivePlatform(ctx, s.C, workflow.Namespace, 
true)
        if err != nil {
                return false, err
        }
diff --git 
a/packages/sonataflow-operator/internal/controller/profiles/dev/states_dev.go 
b/packages/sonataflow-operator/internal/controller/profiles/dev/states_dev.go
index 4ae5443ee69..457dc34cb2e 100644
--- 
a/packages/sonataflow-operator/internal/controller/profiles/dev/states_dev.go
+++ 
b/packages/sonataflow-operator/internal/controller/profiles/dev/states_dev.go
@@ -71,7 +71,7 @@ func (e *ensureRunningWorkflowState) Do(ctx context.Context, 
workflow *operatora
 
        devBaseContainerImage := workflowdef.GetDefaultWorkflowDevModeImageTag()
        // check if the Platform available
-       pl, err := platform.GetActivePlatform(context.TODO(), e.C, 
workflow.Namespace)
+       pl, err := platform.GetActivePlatform(context.TODO(), e.C, 
workflow.Namespace, true)
        if err != nil {
                return ctrl.Result{Requeue: false}, objs, err
        }
diff --git 
a/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler.go
 
b/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler.go
index 61a39162f74..f1285c8baae 100644
--- 
a/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler.go
+++ 
b/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler.go
@@ -19,6 +19,26 @@ package preview
 
 import (
        "context"
+       "fmt"
+
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/manager"
+
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/eventing"
+
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants"
+
+       "k8s.io/client-go/util/retry"
+
+       "k8s.io/apimachinery/pkg/types"
+
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/properties"
+
+       "k8s.io/klog/v2"
+
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/workflowdef"
+       "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
 
        v1 "k8s.io/api/core/v1"
        "sigs.k8s.io/controller-runtime/pkg/client"
@@ -32,7 +52,6 @@ import (
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/monitoring"
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform"
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common"
-       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants"
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/utils"
 )
 
@@ -58,6 +77,7 @@ func (d *DeploymentReconciler) reconcileWithImage(ctx 
context.Context, workflow
                return reconcile.Result{Requeue: false}, nil, err
        }
 
+       previousStatus := workflow.Status
        // Ensure objects
        result, objs, err := d.ensureObjects(ctx, workflow, image)
        if err != nil || result.Requeue {
@@ -70,9 +90,11 @@ func (d *DeploymentReconciler) reconcileWithImage(ctx 
context.Context, workflow
                return reconcile.Result{Requeue: false}, nil, err
        }
 
+       d.updateLastTimeStatusNotified(workflow, previousStatus)
        if _, err := d.PerformStatusUpdate(ctx, workflow); err != nil {
                return reconcile.Result{Requeue: false}, nil, err
        }
+       d.scheduleWorkflowStatusChangeNotification(ctx, workflow)
        return result, objs, nil
 }
 
@@ -95,7 +117,7 @@ func (d *DeploymentReconciler) 
ensureKnativeServingRequired(workflow *operatorap
 }
 
 func (d *DeploymentReconciler) ensureObjects(ctx context.Context, workflow 
*operatorapi.SonataFlow, image string) (reconcile.Result, []client.Object, 
error) {
-       pl, _ := platform.GetActivePlatform(ctx, d.C, workflow.Namespace)
+       pl, _ := platform.GetActivePlatform(ctx, d.C, workflow.Namespace, true)
        userPropsCM, _, err := d.ensurers.userPropsConfigMap.Ensure(ctx, 
workflow)
        if err != nil {
                workflow.Status.Manager().MarkFalse(api.RunningConditionType, 
api.ExternalResourcesNotFoundReason, "Unable to retrieve the user properties 
config map")
@@ -189,3 +211,62 @@ func (d *DeploymentReconciler) 
deploymentModelMutateVisitors(
                common.RestoreDeploymentVolumeAndVolumeMountMutateVisitor(),
                common.RolloutDeploymentIfCMChangedMutateVisitor(workflow, 
userPropsCM, managedPropsCM)}
 }
+
+func (d *DeploymentReconciler) updateLastTimeStatusNotified(workflow 
*operatorapi.SonataFlow, previousStatus operatorapi.SonataFlowStatus) {
+       previousRunningCondition := 
previousStatus.GetCondition(api.RunningConditionType)
+       currentRunningCondition := 
workflow.Status.GetCondition(api.RunningConditionType)
+
+       if previousRunningCondition == nil {
+               previousRunningCondition = currentRunningCondition
+       }
+       if previousRunningCondition.Status != currentRunningCondition.Status || 
workflow.Status.LastTimeStatusNotified != nil && 
workflow.Status.LastTimeStatusNotified.Time.Before(manager.GetOperatorStartTime())
 {
+               workflow.Status.LastTimeStatusNotified = nil
+       }
+}
+
+func (d *DeploymentReconciler) scheduleWorkflowStatusChangeNotification(ctx 
context.Context, workflow *operatorapi.SonataFlow) {
+       if workflow.Status.LastTimeStatusNotified == nil {
+               manager.GetSFCWorker().RunAsync(func() {
+                       if err := notifyWorkflowStatusChange(d.C, 
workflow.Name, workflow.Namespace); err != nil {
+                               klog.V(log.E).ErrorS(err, "Failed to notify 
workflow status change, controller will schedule a new retry.", "workflow", 
"namespace", workflow.Name, workflow.Namespace, err)
+                       }
+               })
+       }
+}
+
+func notifyWorkflowStatusChange(cli client.Client, wfName, wfNamespace string) 
error {
+       var err error
+       var uri string
+       retryErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
+               workflow := &operatorapi.SonataFlow{}
+               if err = cli.Get(context.Background(), 
types.NamespacedName{Name: wfName, Namespace: wfNamespace}, workflow); err != 
nil {
+                       return err
+               }
+               workflow = workflow.DeepCopy()
+               available := 
workflow.Status.GetCondition(api.RunningConditionType).IsTrue()
+               if uri, err = 
eventing.GetWorkflowDefinitionEventsTargetURL(cli, workflow); err != nil {
+                       return fmt.Errorf("failed to get workflow definition 
events target url to send the workflow definition status update event: %v", err)
+               }
+               if len(uri) == 0 {
+                       klog.V(log.D).Infof("No enabled DataIndex, nor Broker, 
nor Sink configuration was found to send the workflow definition status update 
event for workflow: %s, namespace: %s", workflow.Name, workflow.Namespace)
+                       return nil
+               }
+
+               ctx, cancel := context.WithTimeout(context.Background(), 
constants.EventDeliveryTimeout)
+               defer cancel()
+               evt := 
workflowdef.NewWorkflowDefinitionAvailabilityEvent(workflow, 
workflowdef.SonataFlowOperatorSource, 
properties.GetWorkflowEndpointUrl(workflow), available)
+               if err = utils.SendCloudEventWithContext(evt, ctx, uri); err != 
nil {
+                       return fmt.Errorf("failed to send workflow definition 
status update event: %v", err)
+                       // Controller handle to program a new notification 
based on the LastTimeStatusNotified.
+               } else {
+                       now := metav1.Now()
+                       // Register the LastTimeStatusNotified, the controller 
knows how to react based on that value.
+                       workflow.Status.LastTimeStatusNotified = &now
+                       if err = cli.Status().Update(context.Background(), 
workflow); err != nil {
+                               return err
+                       }
+               }
+               return nil
+       })
+       return retryErr
+}
diff --git 
a/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler_test.go
 
b/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler_test.go
index dbc5e405b2d..2cdac64d0ca 100644
--- 
a/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler_test.go
+++ 
b/packages/sonataflow-operator/internal/controller/profiles/preview/deployment_handler_test.go
@@ -21,6 +21,8 @@ import (
        "context"
        "testing"
 
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/manager"
+
        "github.com/magiconair/properties"
        "github.com/stretchr/testify/assert"
        v1 "k8s.io/api/apps/v1"
@@ -69,6 +71,8 @@ func Test_CheckDeploymentModelIsKnative(t *testing.T) {
 }
 
 func Test_CheckPodTemplateChangesReflectDeployment(t *testing.T) {
+       manager.InitializeSFCWorker(manager.SonataFlowControllerWorkerSize)
+       manager.SetOperatorStartTime()
        workflow := test.GetBaseSonataFlowWithPreviewProfile(t.Name())
 
        client := test.NewSonataFlowClientBuilder().
diff --git 
a/packages/sonataflow-operator/internal/controller/profiles/preview/states_preview.go
 
b/packages/sonataflow-operator/internal/controller/profiles/preview/states_preview.go
index 492e5aab733..55a50e5778b 100644
--- 
a/packages/sonataflow-operator/internal/controller/profiles/preview/states_preview.go
+++ 
b/packages/sonataflow-operator/internal/controller/profiles/preview/states_preview.go
@@ -63,7 +63,7 @@ func (h *newBuilderState) CanReconcile(workflow 
*operatorapi.SonataFlow) bool {
 }
 
 func (h *newBuilderState) Do(ctx context.Context, workflow 
*operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) {
-       pl, err := platform.GetActivePlatform(ctx, h.C, workflow.Namespace)
+       pl, err := platform.GetActivePlatform(ctx, h.C, workflow.Namespace, 
true)
        if err != nil {
                if errors.IsNotFound(err) {
                        
workflow.Status.Manager().MarkFalse(api.BuiltConditionType, 
api.WaitingForPlatformReason,
@@ -198,7 +198,7 @@ func (h *deployWithBuildWorkflowState) Do(ctx 
context.Context, workflow *operato
        // Guard to avoid errors while getting a new builder manager.
        // Maybe we can do typed errors in the buildManager and
        // have something like sonataerr.IsPlatformNotFound(err) instead.
-       _, err := platform.GetActivePlatform(ctx, h.C, workflow.Namespace)
+       _, err := platform.GetActivePlatform(ctx, h.C, workflow.Namespace, true)
        if err != nil {
                workflow.Status.Manager().MarkFalse(api.RunningConditionType, 
api.WaitingForPlatformReason,
                        "No active Platform for namespace %s so the 
resWorkflowDef cannot be deployed. Waiting for an active platform", 
workflow.Namespace)
diff --git 
a/packages/sonataflow-operator/internal/controller/sonataflow_controller.go 
b/packages/sonataflow-operator/internal/controller/sonataflow_controller.go
index 8167f42dfa1..55d29dae56d 100644
--- a/packages/sonataflow-operator/internal/controller/sonataflow_controller.go
+++ b/packages/sonataflow-operator/internal/controller/sonataflow_controller.go
@@ -23,17 +23,32 @@ import (
        "context"
        "fmt"
 
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles"
+
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/manager"
+
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/eventing"
+
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/utils"
+
+       "k8s.io/client-go/util/retry"
+
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/properties"
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/workflowdef"
+
+       "sigs.k8s.io/controller-runtime/pkg/predicate"
+
        sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
        servingv1 "knative.dev/serving/pkg/apis/serving/v1"
 
+       "k8s.io/klog/v2"
+
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/knative"
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/monitoring"
 
-       "k8s.io/klog/v2"
-
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/metadata"
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants"
-       profiles 
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/factory"
+       profilesfactory 
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/factory"
 
        appsv1 "k8s.io/api/apps/v1"
        corev1 "k8s.io/api/core/v1"
@@ -104,14 +119,19 @@ func (r *SonataFlowReconciler) Reconcile(ctx 
context.Context, req ctrl.Request)
        }
 
        r.setDefaults(workflow)
-       // If the workflow is being deleted, clean up the triggers on a 
different namespace
-       if workflow.DeletionTimestamp != nil && 
controllerutil.ContainsFinalizer(workflow, constants.TriggerFinalizer) {
-               err := r.cleanupTriggers(ctx, workflow)
-               if err != nil {
-                       klog.V(log.E).ErrorS(err, "Failed to clean up triggers 
for workflow %s", workflow.Name)
-                       return ctrl.Result{}, err
+       // If the workflow is being deleted, execute the associated finalizers
+       if workflow.DeletionTimestamp != nil {
+               return r.applyFinalizers(ctx, workflow)
+       }
+
+       // If first recon cycle, add the WorkflowFinalizer
+       if !profiles.IsDevProfile(workflow) {
+               if controllerutil.AddFinalizer(workflow, 
constants.WorkflowFinalizer) {
+                       if err := r.Client.Update(ctx, workflow); err != nil {
+                               klog.V(log.E).ErrorS(err, "Failed to add 
workflow finalizer.", "workflow", "namespace", "finalizer", workflow.Name, 
workflow.Namespace, constants.WorkflowFinalizer)
+                               return ctrl.Result{}, err
+                       }
                }
-               return ctrl.Result{}, nil
        }
 
        // Only process resources assigned to the operator
@@ -119,7 +139,7 @@ func (r *SonataFlowReconciler) Reconcile(ctx 
context.Context, req ctrl.Request)
                klog.V(log.I).InfoS("Ignoring request because resource is not 
assigned to current operator")
                return reconcile.Result{}, nil
        }
-       return profiles.NewReconciler(r.Client, r.Config, r.Recorder, 
workflow).Reconcile(ctx, workflow)
+       return profilesfactory.NewReconciler(r.Client, r.Config, r.Recorder, 
workflow).Reconcile(ctx, workflow)
 }
 
 // TODO: move to webhook see 
https://github.com/apache/incubator-kie-tools/packages/sonataflow-operator/pull/239
@@ -134,6 +154,91 @@ func (r *SonataFlowReconciler) setDefaults(workflow 
*operatorapi.SonataFlow) {
        }
 }
 
+// applyFinalizers Manages the execution of the workflow finalizers.
+func (r *SonataFlowReconciler) applyFinalizers(ctx context.Context, workflow 
*operatorapi.SonataFlow) (ctrl.Result, error) {
+       if controllerutil.ContainsFinalizer(workflow, 
constants.TriggerFinalizer) {
+               if err := r.cleanupTriggers(ctx, workflow); err != nil {
+                       return ctrl.Result{}, err
+               }
+       }
+       if controllerutil.ContainsFinalizer(workflow, 
constants.WorkflowFinalizer) {
+               var wasScheduled = false
+               var err error
+               if !workflow.Status.FinalizerSucceed && 
workflow.Status.FinalizerAttempts < constants.MaxWorkflowFinalizerAttempts {
+                       now := metav1.Now()
+                       workflow.Status.FinalizerAttempts = 
workflow.Status.FinalizerAttempts + 1
+                       workflow.Status.LastTimeFinalizerAttempt = &now
+                       if err = r.Client.Status().Update(ctx, workflow); err 
!= nil {
+                               return ctrl.Result{}, err
+                       }
+                       if wasScheduled, err = 
scheduleWorkflowDeletionNotification(r.Client, workflow); err != nil {
+                               remaining := 
constants.MaxWorkflowFinalizerAttempts - workflow.Status.FinalizerAttempts
+                               if remaining > 0 {
+                                       klog.V(log.E).ErrorS(err, 
fmt.Sprintf("Failed to schedule workflow deletion notification, %d remaining 
attempts are left", remaining))
+                               } else {
+                                       klog.V(log.E).ErrorS(err, "Failed to 
schedule workflow deletion notification, no attempts are left.", "workflow", 
"namespace", workflow.Name, workflow.Namespace)
+                               }
+                               return ctrl.Result{RequeueAfter: 
constants.WorkflowFinalizerSchedulingRetryInterval}, nil
+                       }
+               }
+               if wasScheduled {
+                       return ctrl.Result{RequeueAfter: 
constants.WorkflowFinalizerRetryInterval}, nil
+               } else {
+                       controllerutil.RemoveFinalizer(workflow, 
constants.WorkflowFinalizer)
+                       if err := r.Client.Update(ctx, workflow); err != nil {
+                               return ctrl.Result{}, err
+                       }
+               }
+       }
+       return ctrl.Result{}, nil
+}
+
+func scheduleWorkflowDeletionNotification(cli client.Client, workflow 
*operatorapi.SonataFlow) (bool, error) {
+       if eventTargetUrl, err := 
eventing.GetWorkflowDefinitionEventsTargetURL(cli, workflow); err != nil {
+               return false, fmt.Errorf("failed to get workflow definition 
events target url to send the workflow definition status update event: %v", err)
+       } else {
+               if len(eventTargetUrl) > 0 {
+                       manager.GetSFCWorker().RunAsync(func() {
+                               wf := *workflow.DeepCopy()
+                               if err := notifyWorkflowDeletion(cli, &wf, 
eventTargetUrl); err != nil {
+                                       klog.V(log.E).ErrorS(err, "Failed to 
notify workflow deletion, controller will schedule a new retry if remaining 
attempts > 0.", "workflow", "namespace", workflow.Name, workflow.Namespace)
+                               }
+                       })
+                       return true, nil
+               }
+       }
+       return false, nil
+}
+
+func notifyWorkflowDeletion(cli client.Client, workflow 
*operatorapi.SonataFlow, eventTargetUrl string) error {
+       retryErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
+               var err error
+               evt := 
workflowdef.NewWorkflowDefinitionAvailabilityEvent(workflow, 
workflowdef.SonataFlowOperatorSource, 
properties.GetWorkflowEndpointUrlWithNameAndNamespace(workflow.Name, 
workflow.Namespace), false)
+               ctx, cancel := context.WithTimeout(context.Background(), 
constants.EventDeliveryTimeout)
+               defer cancel()
+
+               if err = utils.SendCloudEventWithContext(evt, ctx, 
eventTargetUrl); err != nil {
+                       // controller handles to program a new notification 
based on the remainder FinalizerAttempts if needed.
+                       return fmt.Errorf("failed to send workflow definition 
status update event: %v", err)
+               }
+
+               wfName := workflow.Name
+               wfNamespace := workflow.Namespace
+               workflow = &operatorapi.SonataFlow{}
+               if err = cli.Get(context.Background(), 
types.NamespacedName{Name: wfName, Namespace: wfNamespace}, workflow); err != 
nil {
+                       return err
+               }
+
+               workflow = workflow.DeepCopy()
+               workflow.Status.FinalizerSucceed = true
+               if err = cli.Status().Update(context.Background(), workflow); 
err != nil {
+                       return err
+               }
+               return nil
+       })
+       return retryErr
+}
+
 func (r *SonataFlowReconciler) cleanupTriggers(ctx context.Context, workflow 
*operatorapi.SonataFlow) error {
        for _, triggerRef := range workflow.Status.Triggers {
                trigger := &eventingv1.Trigger{
@@ -225,6 +330,15 @@ func buildEnqueueRequestsFromMapFunc(c client.Client, b 
*operatorapi.SonataFlowB
 func (r *SonataFlowReconciler) SetupWithManager(mgr ctrl.Manager) error {
        builder := ctrl.NewControllerManagedBy(mgr).
                For(&operatorapi.SonataFlow{}).
+               WithEventFilter(predicate.Funcs{
+                       UpdateFunc: func(e event.UpdateEvent) bool {
+                               oldGeneration := e.ObjectOld.GetGeneration()
+                               newGeneration := e.ObjectNew.GetGeneration()
+                               // Generation is only updated on spec changes 
(also on deletion), not upon metadata or status changes.
+                               // Filter out events where the generation 
hasn't changed to avoid being triggered by status updates.
+                               return oldGeneration != newGeneration
+                       },
+               }).
                Owns(&appsv1.Deployment{}).
                Owns(&corev1.Service{}).
                Owns(&corev1.ConfigMap{}).
diff --git 
a/packages/sonataflow-operator/internal/controller/workflowdef/events.go 
b/packages/sonataflow-operator/internal/controller/workflowdef/events.go
new file mode 100644
index 00000000000..e9f6d831856
--- /dev/null
+++ b/packages/sonataflow-operator/internal/controller/workflowdef/events.go
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package workflowdef
+
+import (
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/metadata"
+
+       operatorapi 
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
+
+       cloudevents "github.com/cloudevents/sdk-go/v2"
+)
+
+const SonataFlowOperatorSource = "sonataflow.org/operator"
+
+func NewWorkflowDefinitionAvailabilityEvent(workflow *operatorapi.SonataFlow, 
eventSource string, serviceUrl string, available bool) *cloudevents.Event {
+       var status = "unavailable"
+       if available {
+               status = "available"
+       }
+       event := cloudevents.NewEvent(cloudevents.VersionV1)
+       event.SetType("ProcessDefinitionEvent")
+       event.SetSource(eventSource)
+       event.SetExtension("kogitoprocid", workflow.Name)
+       event.SetExtension("partitionkey", workflow.Name)
+       data := make(map[string]interface{})
+       data["id"] = workflow.Name
+       data["name"] = workflow.Name
+       version := workflow.ObjectMeta.Annotations[metadata.Version]
+       data["version"] = version
+       data["type"] = "SW"
+       data["endpoint"] = serviceUrl
+       data["metadata"] = map[string]interface{}{
+               "status": status,
+       }
+       data["nodes"] = [0]string{}
+       _ = event.SetData(cloudevents.ApplicationJSON, data)
+       return &event
+}
diff --git a/packages/sonataflow-operator/internal/manager/worker.go 
b/packages/sonataflow-operator/internal/manager/worker.go
new file mode 100644
index 00000000000..3d4514a0fe1
--- /dev/null
+++ b/packages/sonataflow-operator/internal/manager/worker.go
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package manager
+
+import (
+       "time"
+)
+
+const (
+       SonataFlowControllerWorkerSize = 100
+)
+
+var (
+       sonataFlowControllerWorker *Worker
+       operatorStarTime           time.Time
+)
+
+type Runnable func()
+
+type Worker struct {
+       ch chan Runnable
+}
+
+func SetOperatorStartTime() {
+       operatorStarTime = time.Now()
+}
+
+func GetOperatorStartTime() time.Time {
+       return operatorStarTime
+}
+
+func GetSFCWorker() *Worker {
+       return sonataFlowControllerWorker
+}
+
+func InitializeSFCWorker(size int) *Worker {
+       worker := NewWorker(size)
+       worker.Start()
+       sonataFlowControllerWorker = &worker
+       return sonataFlowControllerWorker
+}
+
+func NewWorker(size int) Worker {
+       return Worker{ch: make(chan Runnable, size)}
+}
+
+func (w Worker) Start() {
+       go func(ch chan Runnable) {
+               for {
+                       r, ok := <-ch
+                       if !ok {
+                               break
+                       } else {
+                               r()
+                       }
+               }
+       }(w.ch)
+}
+
+func (w Worker) RunAsync(r Runnable) {
+       w.ch <- r
+}
diff --git a/packages/sonataflow-operator/operator.yaml 
b/packages/sonataflow-operator/operator.yaml
index e54b4aad101..8bd04af5ada 100644
--- a/packages/sonataflow-operator/operator.yaml
+++ b/packages/sonataflow-operator/operator.yaml
@@ -27509,12 +27509,22 @@ spec:
                 endpoint:
                   description: Endpoint is an externally accessible URL of the 
workflow
                   type: string
+                finalizerAttempts:
+                  type: integer
+                finalizerSucceed:
+                  type: boolean
                 flowCRC:
                   format: int32
                   type: integer
+                lastTimeFinalizerAttempt:
+                  format: date-time
+                  type: string
                 lastTimeRecoverAttempt:
                   format: date-time
                   type: string
+                lastTimeStatusNotified:
+                  format: date-time
+                  type: string
                 observedGeneration:
                   description: The generation observed by the deployment 
controller.
                   format: int64
diff --git a/packages/sonataflow-operator/utils/events.go 
b/packages/sonataflow-operator/utils/events.go
new file mode 100644
index 00000000000..674264872af
--- /dev/null
+++ b/packages/sonataflow-operator/utils/events.go
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package utils
+
+import (
+       "context"
+       "fmt"
+       "net/http"
+
+       cloudevents "github.com/cloudevents/sdk-go/v2"
+       cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
+)
+
+// SendCloudEvent Sends a cloud event to the given url using the http protocol 
binding. By default, events are sent in
+// binary mode.
+func SendCloudEvent(event *cloudevents.Event, url string) error {
+       return SendCloudEventWithContext(event, context.TODO(), url)
+}
+
+// SendCloudEventWithContext Sends a cloud event to the given url using the 
http protocol binding. By default, events
+// are sent in binary mode.
+func SendCloudEventWithContext(event *cloudevents.Event, ctx context.Context, 
url string) error {
+       targetCtx := cloudevents.ContextWithTarget(ctx, url)
+       p, err := cloudevents.NewHTTP()
+       if err != nil {
+               return err
+       }
+       c, err := cloudevents.NewClient(p, cloudevents.WithTimeNow(), 
cloudevents.WithUUIDs())
+       if err != nil {
+               return err
+       }
+       res := c.Send(targetCtx, *event)
+       if cloudevents.IsUndelivered(res) {
+               return fmt.Errorf("failed to send cloud event to url: %s, err: 
%s", url, res.Error())
+       } else {
+               var httpResult *cehttp.Result
+               if cloudevents.ResultAs(res, &httpResult) {
+                       if !resultOK(httpResult) {
+                               return fmt.Errorf("failed to send cloud event 
to url: %s, err: %s", url, httpResult.Error())
+                       }
+               } else {
+                       return fmt.Errorf("failed to send cloud event to url: 
%s, Send did not return an HTTP response: %s", url, res)
+               }
+       }
+       return nil
+}
+
+func resultOK(httpResult *cehttp.Result) bool {
+       return httpResult.StatusCode == http.StatusOK || httpResult.StatusCode 
== http.StatusAccepted
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to