This is an automated email from the ASF dual-hosted git repository.

wmedvedeo pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-serverless-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new c0e6a4d0 [issue-521] Enhance error messages in SonataFlowPlatform CR 
status for knative integration (#558)
c0e6a4d0 is described below

commit c0e6a4d0f1cf8119cda126680f8051f55e73d313
Author: Jianrong Zhang <[email protected]>
AuthorDate: Thu Oct 31 05:48:39 2024 -0400

    [issue-521] Enhance error messages in SonataFlowPlatform CR status for 
knative integration (#558)
---
 internal/controller/platform/action.go             |  6 +--
 internal/controller/platform/create.go             |  6 ++-
 internal/controller/platform/initialize.go         | 16 +++----
 internal/controller/platform/k8s.go                | 52 ++++++++++++----------
 internal/controller/platform/monitor.go            |  7 +--
 internal/controller/platform/services/services.go  | 27 +++++++----
 internal/controller/platform/warm.go               | 10 ++---
 .../controller/sonataflowplatform_controller.go    |  8 ++--
 .../sonataflowplatform_controller_test.go          |  4 +-
 9 files changed, 79 insertions(+), 57 deletions(-)

diff --git a/internal/controller/platform/action.go 
b/internal/controller/platform/action.go
index 58a21ddb..0dc082ed 100644
--- a/internal/controller/platform/action.go
+++ b/internal/controller/platform/action.go
@@ -22,9 +22,9 @@ package platform
 import (
        "context"
 
-       
"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"
-
        v08 
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+       
"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"
+       corev1 "k8s.io/api/core/v1"
 )
 
 // Action --.
@@ -38,7 +38,7 @@ type Action interface {
        CanHandle(platform *v08.SonataFlowPlatform) bool
 
        // executes the handling function
-       Handle(ctx context.Context, platform *v08.SonataFlowPlatform) 
(*v08.SonataFlowPlatform, error)
+       Handle(ctx context.Context, platform *v08.SonataFlowPlatform) 
(*v08.SonataFlowPlatform, *corev1.Event, error)
 }
 
 type baseAction struct {
diff --git a/internal/controller/platform/create.go 
b/internal/controller/platform/create.go
index f3dbbe48..ef825a3b 100644
--- a/internal/controller/platform/create.go
+++ b/internal/controller/platform/create.go
@@ -22,6 +22,8 @@ package platform
 import (
        "context"
 
+       corev1 "k8s.io/api/core/v1"
+
        "github.com/apache/incubator-kie-kogito-serverless-operator/api"
        v08 
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
 )
@@ -43,9 +45,9 @@ func (action *createAction) CanHandle(platform 
*v08.SonataFlowPlatform) bool {
        return platform.Status.IsCreating()
 }
 
-func (action *createAction) Handle(ctx context.Context, platform 
*v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, error) {
+func (action *createAction) Handle(ctx context.Context, platform 
*v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, *corev1.Event, error) {
        //TODO: Perform the actions needed for the Platform creation
        platform.Status.Manager().MarkTrue(api.SucceedConditionType)
 
-       return platform, nil
+       return platform, nil, nil
 }
diff --git a/internal/controller/platform/initialize.go 
b/internal/controller/platform/initialize.go
index 9c486a12..c46bd93b 100644
--- a/internal/controller/platform/initialize.go
+++ b/internal/controller/platform/initialize.go
@@ -61,24 +61,24 @@ func (action *initializeAction) CanHandle(platform 
*operatorapi.SonataFlowPlatfo
        return platform.Status.GetTopLevelCondition().IsUnknown() || 
platform.Status.IsDuplicated()
 }
 
-func (action *initializeAction) Handle(ctx context.Context, platform 
*operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) {
+func (action *initializeAction) Handle(ctx context.Context, platform 
*operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, 
*corev1.Event, error) {
        duplicate, err := action.isPrimaryDuplicate(ctx, platform)
        if err != nil {
-               return nil, err
+               return nil, nil, err
        }
        if duplicate {
                // another platform already present in the namespace
                if !platform.Status.IsDuplicated() {
                        plat := platform.DeepCopy()
                        
plat.Status.Manager().MarkFalse(api.SucceedConditionType, 
operatorapi.PlatformDuplicatedReason, "")
-                       return plat, nil
+                       return plat, nil, nil
                }
 
-               return nil, nil
+               return nil, nil, nil
        }
 
        if err = CreateOrUpdateWithDefaults(ctx, platform, true); err != nil {
-               return nil, err
+               return nil, nil, err
        }
        // nolint: staticcheck
        if platform.Spec.Build.Config.BuildStrategy == 
operatorapi.OperatorBuildStrategy {
@@ -88,13 +88,13 @@ func (action *initializeAction) Handle(ctx context.Context, 
platform *operatorap
                        klog.V(log.I).InfoS("Create persistent volume claim")
                        err := createPersistentVolumeClaim(ctx, action.client, 
platform)
                        if err != nil {
-                               return nil, err
+                               return nil, nil, err
                        }
                        // Create the Kaniko warmer pod that caches the base 
image into the SonataFlow builder volume
                        klog.V(log.I).InfoS("Create Kaniko cache warmer pod")
                        err = createKanikoCacheWarmerPod(ctx, action.client, 
platform)
                        if err != nil {
-                               return nil, err
+                               return nil, nil, err
                        }
                        
platform.Status.Manager().MarkFalse(api.SucceedConditionType, 
operatorapi.PlatformWarmingReason, "")
                } else {
@@ -106,7 +106,7 @@ func (action *initializeAction) Handle(ctx context.Context, 
platform *operatorap
        }
        platform.Status.Version = metadata.SpecVersion
 
-       return platform, nil
+       return platform, nil, nil
 }
 
 // TODO: move this to Kaniko packages based on the platform context
diff --git a/internal/controller/platform/k8s.go 
b/internal/controller/platform/k8s.go
index 9433f5a8..b5c0bde1 100644
--- a/internal/controller/platform/k8s.go
+++ b/internal/controller/platform/k8s.go
@@ -61,38 +61,38 @@ func (action *serviceAction) CanHandle(platform 
*operatorapi.SonataFlowPlatform)
        return platform.Status.IsReady()
 }
 
-func (action *serviceAction) Handle(ctx context.Context, platform 
*operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) {
+func (action *serviceAction) Handle(ctx context.Context, platform 
*operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, 
*corev1.Event, error) {
        // Refresh applied configuration
        if err := CreateOrUpdateWithDefaults(ctx, platform, false); err != nil {
-               return nil, err
+               return nil, nil, err
        }
 
        psDI := services.NewDataIndexHandler(platform)
        if psDI.IsServiceSetInSpec() {
-               if err := createOrUpdateServiceComponents(ctx, action.client, 
platform, psDI); err != nil {
-                       return nil, err
+               if event, err := createOrUpdateServiceComponents(ctx, 
action.client, platform, psDI); err != nil {
+                       return nil, event, err
                }
        }
 
        psJS := services.NewJobServiceHandler(platform)
        if psJS.IsServiceSetInSpec() {
-               if err := createOrUpdateServiceComponents(ctx, action.client, 
platform, psJS); err != nil {
-                       return nil, err
+               if event, err := createOrUpdateServiceComponents(ctx, 
action.client, platform, psJS); err != nil {
+                       return nil, event, err
                }
        }
 
-       return platform, nil
+       return platform, nil, nil
 }
 
-func createOrUpdateServiceComponents(ctx context.Context, client 
client.Client, platform *operatorapi.SonataFlowPlatform, psh 
services.PlatformServiceHandler) error {
+func createOrUpdateServiceComponents(ctx context.Context, client 
client.Client, platform *operatorapi.SonataFlowPlatform, psh 
services.PlatformServiceHandler) (*corev1.Event, error) {
        if err := createOrUpdateConfigMap(ctx, client, platform, psh); err != 
nil {
-               return err
+               return nil, err
        }
        if err := createOrUpdateDeployment(ctx, client, platform, psh); err != 
nil {
-               return err
+               return nil, err
        }
        if err := createOrUpdateService(ctx, client, platform, psh); err != nil 
{
-               return err
+               return nil, err
        }
        return createOrUpdateKnativeResources(ctx, client, platform, psh)
 }
@@ -307,24 +307,24 @@ func setSonataFlowPlatformFinalizer(ctx context.Context, 
c client.Client, platfo
        return nil
 }
 
-func createOrUpdateKnativeResources(ctx context.Context, client client.Client, 
platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) 
error {
+func createOrUpdateKnativeResources(ctx context.Context, client client.Client, 
platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) 
(*corev1.Event, error) {
        lbl, _ := getLabels(platform, psh)
-       objs, err := psh.GenerateKnativeResources(platform, lbl)
+       objs, event, err := psh.GenerateKnativeResources(platform, lbl)
        if err != nil {
-               return err
+               return event, err
        }
        // Create or update triggers
        for _, obj := range objs {
                if triggerDef, ok := obj.(*eventingv1.Trigger); ok {
                        if platform.Namespace == obj.GetNamespace() {
                                if err := 
controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != 
nil {
-                                       return err
+                                       return nil, err
                                }
                        } else {
                                // This is for Knative trigger in a different 
namespace
                                // Set the finalizer for trigger cleanup when 
the platform is deleted
                                if err := setSonataFlowPlatformFinalizer(ctx, 
client, platform); err != nil {
-                                       return err
+                                       return nil, err
                                }
                        }
                        trigger := &eventingv1.Trigger{
@@ -335,21 +335,21 @@ func createOrUpdateKnativeResources(ctx context.Context, 
client client.Client, p
                                return nil
                        })
                        if err != nil {
-                               return err
+                               return nil, err
                        }
                        addToSonataFlowPlatformTriggerList(platform, trigger)
                }
        }
 
        if err := SafeUpdatePlatformStatus(ctx, platform); err != nil {
-               return err
+               return nil, err
        }
 
        // Create or update sinkbindings
        for _, obj := range objs {
                if sbDef, ok := obj.(*sourcesv1.SinkBinding); ok {
                        if err := 
controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != 
nil {
-                               return err
+                               return nil, err
                        }
                        sinkBinding := &sourcesv1.SinkBinding{
                                ObjectMeta: sbDef.ObjectMeta,
@@ -359,18 +359,24 @@ func createOrUpdateKnativeResources(ctx context.Context, 
client client.Client, p
                                return nil
                        })
                        if err != nil {
-                               return err
+                               return nil, err
                        }
                        kSinkInjected, err := psh.CheckKSinkInjected()
                        if err != nil {
-                               return err
+                               return nil, err
                        }
                        if !kSinkInjected {
-                               return fmt.Errorf("waiting for K_SINK injection 
for %s to complete", psh.GetServiceName())
+                               msg := fmt.Sprintf("waiting for K_SINK 
injection for service %s to complete", psh.GetServiceName())
+                               event := &corev1.Event{
+                                       Type:    corev1.EventTypeWarning,
+                                       Reason:  
services.WaitingKnativeEventing,
+                                       Message: msg,
+                               }
+                               return event, fmt.Errorf(msg)
                        }
                }
        }
-       return nil
+       return nil, nil
 }
 
 func addToSonataFlowPlatformTriggerList(platform 
*operatorapi.SonataFlowPlatform, trigger *eventingv1.Trigger) {
diff --git a/internal/controller/platform/monitor.go 
b/internal/controller/platform/monitor.go
index ed10326a..d3f5c1cd 100644
--- a/internal/controller/platform/monitor.go
+++ b/internal/controller/platform/monitor.go
@@ -22,6 +22,7 @@ package platform
 import (
        "context"
 
+       corev1 "k8s.io/api/core/v1"
        "k8s.io/klog/v2"
 
        
"github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata"
@@ -46,7 +47,7 @@ func (action *monitorAction) CanHandle(platform 
*operatorapi.SonataFlowPlatform)
        return platform.Status.IsReady()
 }
 
-func (action *monitorAction) Handle(ctx context.Context, platform 
*operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) {
+func (action *monitorAction) Handle(ctx context.Context, platform 
*operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, 
*corev1.Event, error) {
        // Just track the version of the operator in the platform resource
        if platform.Status.Version != metadata.SpecVersion {
                platform.Status.Version = metadata.SpecVersion
@@ -55,8 +56,8 @@ func (action *monitorAction) Handle(ctx context.Context, 
platform *operatorapi.S
 
        // Refresh applied configuration
        if err := CreateOrUpdateWithDefaults(ctx, platform, false); err != nil {
-               return nil, err
+               return nil, nil, err
        }
 
-       return platform, nil
+       return platform, nil, nil
 }
diff --git a/internal/controller/platform/services/services.go 
b/internal/controller/platform/services/services.go
index 4fb67a35..bf448728 100644
--- a/internal/controller/platform/services/services.go
+++ b/internal/controller/platform/services/services.go
@@ -49,6 +49,7 @@ import (
 const (
        quarkusHibernateORMDatabaseGeneration string = 
"QUARKUS_HIBERNATE_ORM_DATABASE_GENERATION"
        quarkusFlywayMigrateAtStart           string = 
"QUARKUS_FLYWAY_MIGRATE_AT_START"
+       WaitingKnativeEventing                       = "WaitingKnativeEventing"
 )
 
 type PlatformServiceHandler interface {
@@ -83,7 +84,7 @@ type PlatformServiceHandler interface {
        // GenerateServiceProperties returns a property object that contains 
the application properties required by the service deployment
        GenerateServiceProperties() (*properties.Properties, error)
        // GenerateKnativeResources returns knative resources that bridge 
between workflow deploys and the service
-       GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl 
map[string]string) ([]client.Object, error)
+       GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl 
map[string]string) ([]client.Object, *corev1.Event, error)
 
        // IsServiceSetInSpec returns true if the service is set in the spec.
        IsServiceSetInSpec() bool
@@ -582,10 +583,10 @@ func (d *DataIndexHandler) newTrigger(labels 
map[string]string, brokerName, name
                },
        }
 }
-func (d *DataIndexHandler) GenerateKnativeResources(platform 
*operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, 
error) {
+func (d *DataIndexHandler) GenerateKnativeResources(platform 
*operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, 
*corev1.Event, error) {
        broker := d.GetSourceBroker()
        if broker == nil || len(broker.Ref.Name) == 0 {
-               return nil, nil // Nothing to do
+               return nil, nil, nil // Nothing to do
        }
        brokerName := broker.Ref.Name
        namespace := broker.Ref.Namespace
@@ -593,7 +594,12 @@ func (d *DataIndexHandler) 
GenerateKnativeResources(platform *operatorapi.Sonata
                namespace = platform.Namespace
        }
        if err := knative.ValidateBroker(brokerName, namespace); err != nil {
-               return nil, err
+               event := &corev1.Event{
+                       Type:    corev1.EventTypeWarning,
+                       Reason:  WaitingKnativeEventing,
+                       Message: fmt.Sprintf("%s for service: %s", err.Error(), 
d.GetServiceName()),
+               }
+               return nil, event, err
        }
        serviceName := d.GetServiceName()
        return []client.Object{
@@ -604,7 +610,7 @@ func (d *DataIndexHandler) 
GenerateKnativeResources(platform *operatorapi.Sonata
                d.newTrigger(lbl, brokerName, namespace, serviceName, 
"process-variable", "ProcessInstanceVariableDataEvent", 
constants.KogitoProcessInstancesEventsPath, platform),
                d.newTrigger(lbl, brokerName, namespace, serviceName, 
"process-definition", "ProcessDefinitionEvent", 
constants.KogitoProcessDefinitionsEventsPath, platform),
                d.newTrigger(lbl, brokerName, namespace, serviceName, 
"process-instance-multiple", "MultipleProcessInstanceDataEvent", 
constants.KogitoProcessInstancesMultiEventsPath, platform),
-               d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", 
"JobEvent", constants.KogitoJobsPath, platform)}, nil
+               d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs", 
"JobEvent", constants.KogitoJobsPath, platform)}, nil, nil
 }
 
 func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
@@ -621,7 +627,7 @@ func (d JobServiceHandler) GetSink() *duckv1.Destination {
        return GetPlatformBroker(d.platform)
 }
 
-func (j *JobServiceHandler) GenerateKnativeResources(platform 
*operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, 
error) {
+func (j *JobServiceHandler) GenerateKnativeResources(platform 
*operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, 
*corev1.Event, error) {
        broker := j.GetSourceBroker()
        sink := j.GetSink()
        resultObjs := []client.Object{}
@@ -633,7 +639,12 @@ func (j *JobServiceHandler) 
GenerateKnativeResources(platform *operatorapi.Sonat
                        namespace = platform.Namespace
                }
                if err := knative.ValidateBroker(brokerName, namespace); err != 
nil {
-                       return nil, err
+                       event := &corev1.Event{
+                               Type:    corev1.EventTypeWarning,
+                               Reason:  WaitingKnativeEventing,
+                               Message: fmt.Sprintf("%s for service: %s", 
err.Error(), j.GetServiceName()),
+                       }
+                       return nil, event, err
                }
                jobCreateTrigger := &eventingv1.Trigger{
                        ObjectMeta: metav1.ObjectMeta{
@@ -713,7 +724,7 @@ func (j *JobServiceHandler) 
GenerateKnativeResources(platform *operatorapi.Sonat
                }
                resultObjs = append(resultObjs, sinkBinding)
        }
-       return resultObjs, nil
+       return resultObjs, nil, nil
 }
 
 func (j *JobServiceHandler) CheckKSinkInjected() (bool, error) {
diff --git a/internal/controller/platform/warm.go 
b/internal/controller/platform/warm.go
index b76ff5f6..b9515507 100644
--- a/internal/controller/platform/warm.go
+++ b/internal/controller/platform/warm.go
@@ -54,7 +54,7 @@ func (action *warmAction) CanHandle(platform 
*operatorapi.SonataFlowPlatform) bo
        return platform.Status.IsWarming()
 }
 
-func (action *warmAction) Handle(ctx context.Context, platform 
*operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) {
+func (action *warmAction) Handle(ctx context.Context, platform 
*operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, 
*corev1.Event, error) {
        // Check Kaniko warmer pod status
        pod := corev1.Pod{
                TypeMeta: metav1.TypeMeta{
@@ -69,19 +69,19 @@ func (action *warmAction) Handle(ctx context.Context, 
platform *operatorapi.Sona
 
        err := action.reader.Get(ctx, types.NamespacedName{Namespace: 
pod.Namespace, Name: pod.Name}, &pod)
        if err != nil {
-               return nil, err
+               return nil, nil, err
        }
 
        switch pod.Status.Phase {
        case corev1.PodSucceeded:
                klog.V(log.D).InfoS("Kaniko cache successfully warmed up")
                
platform.Status.Manager().MarkTrueWithReason(api.SucceedConditionType, 
operatorapi.PlatformWarmingReason, "Kaniko cache successfully warmed up")
-               return platform, nil
+               return platform, nil, nil
        case corev1.PodFailed:
-               return nil, errors.New("failed to warm up Kaniko cache")
+               return nil, nil, errors.New("failed to warm up Kaniko cache")
        default:
                klog.V(log.I).InfoS("Waiting for Kaniko cache to warm up...")
                // Requeue
-               return nil, nil
+               return nil, nil, nil
        }
 }
diff --git a/internal/controller/sonataflowplatform_controller.go 
b/internal/controller/sonataflowplatform_controller.go
index 6126e165..195fb091 100644
--- a/internal/controller/sonataflowplatform_controller.go
+++ b/internal/controller/sonataflowplatform_controller.go
@@ -28,8 +28,6 @@ import (
        
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/monitoring"
        sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
 
-       "k8s.io/klog/v2"
-
        "github.com/apache/incubator-kie-kogito-serverless-operator/api"
        operatorapi 
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
        clientr 
"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"
@@ -46,6 +44,7 @@ import (
        "k8s.io/apimachinery/pkg/types"
        "k8s.io/client-go/rest"
        "k8s.io/client-go/tools/record"
+       "k8s.io/klog/v2"
        eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
        ctrlrun "sigs.k8s.io/controller-runtime"
        "sigs.k8s.io/controller-runtime/pkg/client"
@@ -155,7 +154,10 @@ func (r *SonataFlowPlatformReconciler) Reconcile(ctx 
context.Context, req reconc
 
                        klog.V(log.I).InfoS("Invoking action", "Name", a.Name())
 
-                       target, err = a.Handle(ctx, target)
+                       target, event, err := a.Handle(ctx, target)
+                       if event != nil {
+                               r.Recorder.Event(&instance, event.Type, 
event.Reason, event.Message)
+                       }
                        if err != nil {
                                if target != nil {
                                        
target.Status.Manager().MarkFalse(api.SucceedConditionType, 
operatorapi.PlatformFailureReason, err.Error())
diff --git a/internal/controller/sonataflowplatform_controller_test.go 
b/internal/controller/sonataflowplatform_controller_test.go
index dbadd2e8..af9280a3 100644
--- a/internal/controller/sonataflowplatform_controller_test.go
+++ b/internal/controller/sonataflowplatform_controller_test.go
@@ -886,7 +886,7 @@ func TestSonataFlowPlatformController(t *testing.T) {
                        },
                }
                _, err := r.Reconcile(context.TODO(), req)
-               if err != nil && err.Error() != "waiting for K_SINK injection 
for sonataflow-platform-jobs-service to complete" {
+               if err != nil && err.Error() != "waiting for K_SINK injection 
for service sonataflow-platform-jobs-service to complete" {
                        t.Fatalf("reconcile: (%v)", err)
                }
 
@@ -987,7 +987,7 @@ func TestSonataFlowPlatformController(t *testing.T) {
                        },
                }
                _, err := r.Reconcile(context.TODO(), req)
-               if err != nil && err.Error() != "waiting for K_SINK injection 
for sonataflow-platform-jobs-service to complete" {
+               if err != nil && err.Error() != "waiting for K_SINK injection 
for service sonataflow-platform-jobs-service to complete" {
                        t.Fatalf("reconcile: (%v)", err)
                }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to