This is an automated email from the ASF dual-hosted git repository.
wmedvedeo pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-serverless-operator.git
The following commit(s) were added to refs/heads/main by this push:
new c0e6a4d0 [issue-521] Enhance error messages in SonataFlowPlatform CR
status for knative integration (#558)
c0e6a4d0 is described below
commit c0e6a4d0f1cf8119cda126680f8051f55e73d313
Author: Jianrong Zhang <[email protected]>
AuthorDate: Thu Oct 31 05:48:39 2024 -0400
[issue-521] Enhance error messages in SonataFlowPlatform CR status for
knative integration (#558)
---
internal/controller/platform/action.go | 6 +--
internal/controller/platform/create.go | 6 ++-
internal/controller/platform/initialize.go | 16 +++----
internal/controller/platform/k8s.go | 52 ++++++++++++----------
internal/controller/platform/monitor.go | 7 +--
internal/controller/platform/services/services.go | 27 +++++++----
internal/controller/platform/warm.go | 10 ++---
.../controller/sonataflowplatform_controller.go | 8 ++--
.../sonataflowplatform_controller_test.go | 4 +-
9 files changed, 79 insertions(+), 57 deletions(-)
diff --git a/internal/controller/platform/action.go
b/internal/controller/platform/action.go
index 58a21ddb..0dc082ed 100644
--- a/internal/controller/platform/action.go
+++ b/internal/controller/platform/action.go
@@ -22,9 +22,9 @@ package platform
import (
"context"
-
"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"
-
v08
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"
+ corev1 "k8s.io/api/core/v1"
)
// Action --.
@@ -38,7 +38,7 @@ type Action interface {
CanHandle(platform *v08.SonataFlowPlatform) bool
// executes the handling function
- Handle(ctx context.Context, platform *v08.SonataFlowPlatform)
(*v08.SonataFlowPlatform, error)
+ Handle(ctx context.Context, platform *v08.SonataFlowPlatform)
(*v08.SonataFlowPlatform, *corev1.Event, error)
}
type baseAction struct {
diff --git a/internal/controller/platform/create.go
b/internal/controller/platform/create.go
index f3dbbe48..ef825a3b 100644
--- a/internal/controller/platform/create.go
+++ b/internal/controller/platform/create.go
@@ -22,6 +22,8 @@ package platform
import (
"context"
+ corev1 "k8s.io/api/core/v1"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/api"
v08
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
)
@@ -43,9 +45,9 @@ func (action *createAction) CanHandle(platform
*v08.SonataFlowPlatform) bool {
return platform.Status.IsCreating()
}
-func (action *createAction) Handle(ctx context.Context, platform
*v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, error) {
+func (action *createAction) Handle(ctx context.Context, platform
*v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, *corev1.Event, error) {
//TODO: Perform the actions needed for the Platform creation
platform.Status.Manager().MarkTrue(api.SucceedConditionType)
- return platform, nil
+ return platform, nil, nil
}
diff --git a/internal/controller/platform/initialize.go
b/internal/controller/platform/initialize.go
index 9c486a12..c46bd93b 100644
--- a/internal/controller/platform/initialize.go
+++ b/internal/controller/platform/initialize.go
@@ -61,24 +61,24 @@ func (action *initializeAction) CanHandle(platform
*operatorapi.SonataFlowPlatfo
return platform.Status.GetTopLevelCondition().IsUnknown() ||
platform.Status.IsDuplicated()
}
-func (action *initializeAction) Handle(ctx context.Context, platform
*operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) {
+func (action *initializeAction) Handle(ctx context.Context, platform
*operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform,
*corev1.Event, error) {
duplicate, err := action.isPrimaryDuplicate(ctx, platform)
if err != nil {
- return nil, err
+ return nil, nil, err
}
if duplicate {
// another platform already present in the namespace
if !platform.Status.IsDuplicated() {
plat := platform.DeepCopy()
plat.Status.Manager().MarkFalse(api.SucceedConditionType,
operatorapi.PlatformDuplicatedReason, "")
- return plat, nil
+ return plat, nil, nil
}
- return nil, nil
+ return nil, nil, nil
}
if err = CreateOrUpdateWithDefaults(ctx, platform, true); err != nil {
- return nil, err
+ return nil, nil, err
}
// nolint: staticcheck
if platform.Spec.Build.Config.BuildStrategy ==
operatorapi.OperatorBuildStrategy {
@@ -88,13 +88,13 @@ func (action *initializeAction) Handle(ctx context.Context,
platform *operatorap
klog.V(log.I).InfoS("Create persistent volume claim")
err := createPersistentVolumeClaim(ctx, action.client,
platform)
if err != nil {
- return nil, err
+ return nil, nil, err
}
// Create the Kaniko warmer pod that caches the base
image into the SonataFlow builder volume
klog.V(log.I).InfoS("Create Kaniko cache warmer pod")
err = createKanikoCacheWarmerPod(ctx, action.client,
platform)
if err != nil {
- return nil, err
+ return nil, nil, err
}
platform.Status.Manager().MarkFalse(api.SucceedConditionType,
operatorapi.PlatformWarmingReason, "")
} else {
@@ -106,7 +106,7 @@ func (action *initializeAction) Handle(ctx context.Context,
platform *operatorap
}
platform.Status.Version = metadata.SpecVersion
- return platform, nil
+ return platform, nil, nil
}
// TODO: move this to Kaniko packages based on the platform context
diff --git a/internal/controller/platform/k8s.go
b/internal/controller/platform/k8s.go
index 9433f5a8..b5c0bde1 100644
--- a/internal/controller/platform/k8s.go
+++ b/internal/controller/platform/k8s.go
@@ -61,38 +61,38 @@ func (action *serviceAction) CanHandle(platform
*operatorapi.SonataFlowPlatform)
return platform.Status.IsReady()
}
-func (action *serviceAction) Handle(ctx context.Context, platform
*operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) {
+func (action *serviceAction) Handle(ctx context.Context, platform
*operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform,
*corev1.Event, error) {
// Refresh applied configuration
if err := CreateOrUpdateWithDefaults(ctx, platform, false); err != nil {
- return nil, err
+ return nil, nil, err
}
psDI := services.NewDataIndexHandler(platform)
if psDI.IsServiceSetInSpec() {
- if err := createOrUpdateServiceComponents(ctx, action.client,
platform, psDI); err != nil {
- return nil, err
+ if event, err := createOrUpdateServiceComponents(ctx,
action.client, platform, psDI); err != nil {
+ return nil, event, err
}
}
psJS := services.NewJobServiceHandler(platform)
if psJS.IsServiceSetInSpec() {
- if err := createOrUpdateServiceComponents(ctx, action.client,
platform, psJS); err != nil {
- return nil, err
+ if event, err := createOrUpdateServiceComponents(ctx,
action.client, platform, psJS); err != nil {
+ return nil, event, err
}
}
- return platform, nil
+ return platform, nil, nil
}
-func createOrUpdateServiceComponents(ctx context.Context, client
client.Client, platform *operatorapi.SonataFlowPlatform, psh
services.PlatformServiceHandler) error {
+func createOrUpdateServiceComponents(ctx context.Context, client
client.Client, platform *operatorapi.SonataFlowPlatform, psh
services.PlatformServiceHandler) (*corev1.Event, error) {
if err := createOrUpdateConfigMap(ctx, client, platform, psh); err !=
nil {
- return err
+ return nil, err
}
if err := createOrUpdateDeployment(ctx, client, platform, psh); err !=
nil {
- return err
+ return nil, err
}
if err := createOrUpdateService(ctx, client, platform, psh); err != nil
{
- return err
+ return nil, err
}
return createOrUpdateKnativeResources(ctx, client, platform, psh)
}
@@ -307,24 +307,24 @@ func setSonataFlowPlatformFinalizer(ctx context.Context,
c client.Client, platfo
return nil
}
-func createOrUpdateKnativeResources(ctx context.Context, client client.Client,
platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler)
error {
+func createOrUpdateKnativeResources(ctx context.Context, client client.Client,
platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler)
(*corev1.Event, error) {
lbl, _ := getLabels(platform, psh)
- objs, err := psh.GenerateKnativeResources(platform, lbl)
+ objs, event, err := psh.GenerateKnativeResources(platform, lbl)
if err != nil {
- return err
+ return event, err
}
// Create or update triggers
for _, obj := range objs {
if triggerDef, ok := obj.(*eventingv1.Trigger); ok {
if platform.Namespace == obj.GetNamespace() {
if err :=
controllerutil.SetControllerReference(platform, obj, client.Scheme()); err !=
nil {
- return err
+ return nil, err
}
} else {
// This is for Knative trigger in a different
namespace
// Set the finalizer for trigger cleanup when
the platform is deleted
if err := setSonataFlowPlatformFinalizer(ctx,
client, platform); err != nil {
- return err
+ return nil, err
}
}
trigger := &eventingv1.Trigger{
@@ -335,21 +335,21 @@ func createOrUpdateKnativeResources(ctx context.Context,
client client.Client, p
return nil
})
if err != nil {
- return err
+ return nil, err
}
addToSonataFlowPlatformTriggerList(platform, trigger)
}
}
if err := SafeUpdatePlatformStatus(ctx, platform); err != nil {
- return err
+ return nil, err
}
// Create or update sinkbindings
for _, obj := range objs {
if sbDef, ok := obj.(*sourcesv1.SinkBinding); ok {
if err :=
controllerutil.SetControllerReference(platform, obj, client.Scheme()); err !=
nil {
- return err
+ return nil, err
}
sinkBinding := &sourcesv1.SinkBinding{
ObjectMeta: sbDef.ObjectMeta,
@@ -359,18 +359,24 @@ func createOrUpdateKnativeResources(ctx context.Context,
client client.Client, p
return nil
})
if err != nil {
- return err
+ return nil, err
}
kSinkInjected, err := psh.CheckKSinkInjected()
if err != nil {
- return err
+ return nil, err
}
if !kSinkInjected {
- return fmt.Errorf("waiting for K_SINK injection
for %s to complete", psh.GetServiceName())
+ msg := fmt.Sprintf("waiting for K_SINK
injection for service %s to complete", psh.GetServiceName())
+ event := &corev1.Event{
+ Type: corev1.EventTypeWarning,
+ Reason:
services.WaitingKnativeEventing,
+ Message: msg,
+ }
+ return event, fmt.Errorf(msg)
}
}
}
- return nil
+ return nil, nil
}
func addToSonataFlowPlatformTriggerList(platform
*operatorapi.SonataFlowPlatform, trigger *eventingv1.Trigger) {
diff --git a/internal/controller/platform/monitor.go
b/internal/controller/platform/monitor.go
index ed10326a..d3f5c1cd 100644
--- a/internal/controller/platform/monitor.go
+++ b/internal/controller/platform/monitor.go
@@ -22,6 +22,7 @@ package platform
import (
"context"
+ corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata"
@@ -46,7 +47,7 @@ func (action *monitorAction) CanHandle(platform
*operatorapi.SonataFlowPlatform)
return platform.Status.IsReady()
}
-func (action *monitorAction) Handle(ctx context.Context, platform
*operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) {
+func (action *monitorAction) Handle(ctx context.Context, platform
*operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform,
*corev1.Event, error) {
// Just track the version of the operator in the platform resource
if platform.Status.Version != metadata.SpecVersion {
platform.Status.Version = metadata.SpecVersion
@@ -55,8 +56,8 @@ func (action *monitorAction) Handle(ctx context.Context,
platform *operatorapi.S
// Refresh applied configuration
if err := CreateOrUpdateWithDefaults(ctx, platform, false); err != nil {
- return nil, err
+ return nil, nil, err
}
- return platform, nil
+ return platform, nil, nil
}
diff --git a/internal/controller/platform/services/services.go
b/internal/controller/platform/services/services.go
index 4fb67a35..bf448728 100644
--- a/internal/controller/platform/services/services.go
+++ b/internal/controller/platform/services/services.go
@@ -49,6 +49,7 @@ import (
const (
quarkusHibernateORMDatabaseGeneration string =
"QUARKUS_HIBERNATE_ORM_DATABASE_GENERATION"
quarkusFlywayMigrateAtStart string =
"QUARKUS_FLYWAY_MIGRATE_AT_START"
+ WaitingKnativeEventing = "WaitingKnativeEventing"
)
type PlatformServiceHandler interface {
@@ -83,7 +84,7 @@ type PlatformServiceHandler interface {
// GenerateServiceProperties returns a property object that contains
the application properties required by the service deployment
GenerateServiceProperties() (*properties.Properties, error)
// GenerateKnativeResources returns knative resources that bridge
between workflow deploys and the service
- GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl
map[string]string) ([]client.Object, error)
+ GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl
map[string]string) ([]client.Object, *corev1.Event, error)
// IsServiceSetInSpec returns true if the service is set in the spec.
IsServiceSetInSpec() bool
@@ -582,10 +583,10 @@ func (d *DataIndexHandler) newTrigger(labels
map[string]string, brokerName, name
},
}
}
-func (d *DataIndexHandler) GenerateKnativeResources(platform
*operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object,
error) {
+func (d *DataIndexHandler) GenerateKnativeResources(platform
*operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object,
*corev1.Event, error) {
broker := d.GetSourceBroker()
if broker == nil || len(broker.Ref.Name) == 0 {
- return nil, nil // Nothing to do
+ return nil, nil, nil // Nothing to do
}
brokerName := broker.Ref.Name
namespace := broker.Ref.Namespace
@@ -593,7 +594,12 @@ func (d *DataIndexHandler)
GenerateKnativeResources(platform *operatorapi.Sonata
namespace = platform.Namespace
}
if err := knative.ValidateBroker(brokerName, namespace); err != nil {
- return nil, err
+ event := &corev1.Event{
+ Type: corev1.EventTypeWarning,
+ Reason: WaitingKnativeEventing,
+ Message: fmt.Sprintf("%s for service: %s", err.Error(),
d.GetServiceName()),
+ }
+ return nil, event, err
}
serviceName := d.GetServiceName()
return []client.Object{
@@ -604,7 +610,7 @@ func (d *DataIndexHandler)
GenerateKnativeResources(platform *operatorapi.Sonata
d.newTrigger(lbl, brokerName, namespace, serviceName,
"process-variable", "ProcessInstanceVariableDataEvent",
constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName,
"process-definition", "ProcessDefinitionEvent",
constants.KogitoProcessDefinitionsEventsPath, platform),
d.newTrigger(lbl, brokerName, namespace, serviceName,
"process-instance-multiple", "MultipleProcessInstanceDataEvent",
constants.KogitoProcessInstancesMultiEventsPath, platform),
- d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs",
"JobEvent", constants.KogitoJobsPath, platform)}, nil
+ d.newTrigger(lbl, brokerName, namespace, serviceName, "jobs",
"JobEvent", constants.KogitoJobsPath, platform)}, nil, nil
}
func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
@@ -621,7 +627,7 @@ func (d JobServiceHandler) GetSink() *duckv1.Destination {
return GetPlatformBroker(d.platform)
}
-func (j *JobServiceHandler) GenerateKnativeResources(platform
*operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object,
error) {
+func (j *JobServiceHandler) GenerateKnativeResources(platform
*operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object,
*corev1.Event, error) {
broker := j.GetSourceBroker()
sink := j.GetSink()
resultObjs := []client.Object{}
@@ -633,7 +639,12 @@ func (j *JobServiceHandler)
GenerateKnativeResources(platform *operatorapi.Sonat
namespace = platform.Namespace
}
if err := knative.ValidateBroker(brokerName, namespace); err !=
nil {
- return nil, err
+ event := &corev1.Event{
+ Type: corev1.EventTypeWarning,
+ Reason: WaitingKnativeEventing,
+ Message: fmt.Sprintf("%s for service: %s",
err.Error(), j.GetServiceName()),
+ }
+ return nil, event, err
}
jobCreateTrigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
@@ -713,7 +724,7 @@ func (j *JobServiceHandler)
GenerateKnativeResources(platform *operatorapi.Sonat
}
resultObjs = append(resultObjs, sinkBinding)
}
- return resultObjs, nil
+ return resultObjs, nil, nil
}
func (j *JobServiceHandler) CheckKSinkInjected() (bool, error) {
diff --git a/internal/controller/platform/warm.go
b/internal/controller/platform/warm.go
index b76ff5f6..b9515507 100644
--- a/internal/controller/platform/warm.go
+++ b/internal/controller/platform/warm.go
@@ -54,7 +54,7 @@ func (action *warmAction) CanHandle(platform
*operatorapi.SonataFlowPlatform) bo
return platform.Status.IsWarming()
}
-func (action *warmAction) Handle(ctx context.Context, platform
*operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) {
+func (action *warmAction) Handle(ctx context.Context, platform
*operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform,
*corev1.Event, error) {
// Check Kaniko warmer pod status
pod := corev1.Pod{
TypeMeta: metav1.TypeMeta{
@@ -69,19 +69,19 @@ func (action *warmAction) Handle(ctx context.Context,
platform *operatorapi.Sona
err := action.reader.Get(ctx, types.NamespacedName{Namespace:
pod.Namespace, Name: pod.Name}, &pod)
if err != nil {
- return nil, err
+ return nil, nil, err
}
switch pod.Status.Phase {
case corev1.PodSucceeded:
klog.V(log.D).InfoS("Kaniko cache successfully warmed up")
platform.Status.Manager().MarkTrueWithReason(api.SucceedConditionType,
operatorapi.PlatformWarmingReason, "Kaniko cache successfully warmed up")
- return platform, nil
+ return platform, nil, nil
case corev1.PodFailed:
- return nil, errors.New("failed to warm up Kaniko cache")
+ return nil, nil, errors.New("failed to warm up Kaniko cache")
default:
klog.V(log.I).InfoS("Waiting for Kaniko cache to warm up...")
// Requeue
- return nil, nil
+ return nil, nil, nil
}
}
diff --git a/internal/controller/sonataflowplatform_controller.go
b/internal/controller/sonataflowplatform_controller.go
index 6126e165..195fb091 100644
--- a/internal/controller/sonataflowplatform_controller.go
+++ b/internal/controller/sonataflowplatform_controller.go
@@ -28,8 +28,6 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/monitoring"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
- "k8s.io/klog/v2"
-
"github.com/apache/incubator-kie-kogito-serverless-operator/api"
operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
clientr
"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"
@@ -46,6 +44,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
+ "k8s.io/klog/v2"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
ctrlrun "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -155,7 +154,10 @@ func (r *SonataFlowPlatformReconciler) Reconcile(ctx
context.Context, req reconc
klog.V(log.I).InfoS("Invoking action", "Name", a.Name())
- target, err = a.Handle(ctx, target)
+ target, event, err := a.Handle(ctx, target)
+ if event != nil {
+ r.Recorder.Event(&instance, event.Type,
event.Reason, event.Message)
+ }
if err != nil {
if target != nil {
target.Status.Manager().MarkFalse(api.SucceedConditionType,
operatorapi.PlatformFailureReason, err.Error())
diff --git a/internal/controller/sonataflowplatform_controller_test.go
b/internal/controller/sonataflowplatform_controller_test.go
index dbadd2e8..af9280a3 100644
--- a/internal/controller/sonataflowplatform_controller_test.go
+++ b/internal/controller/sonataflowplatform_controller_test.go
@@ -886,7 +886,7 @@ func TestSonataFlowPlatformController(t *testing.T) {
},
}
_, err := r.Reconcile(context.TODO(), req)
- if err != nil && err.Error() != "waiting for K_SINK injection
for sonataflow-platform-jobs-service to complete" {
+ if err != nil && err.Error() != "waiting for K_SINK injection
for service sonataflow-platform-jobs-service to complete" {
t.Fatalf("reconcile: (%v)", err)
}
@@ -987,7 +987,7 @@ func TestSonataFlowPlatformController(t *testing.T) {
},
}
_, err := r.Reconcile(context.TODO(), req)
- if err != nil && err.Error() != "waiting for K_SINK injection
for sonataflow-platform-jobs-service to complete" {
+ if err != nil && err.Error() != "waiting for K_SINK injection
for service sonataflow-platform-jobs-service to complete" {
t.Fatalf("reconcile: (%v)", err)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]