This is an automated email from the ASF dual-hosted git repository.

wmedvedeo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-tools.git


The following commit(s) were added to refs/heads/main by this push:
     new 60e394abed1 [sonataflow-operator] Knative outdated revisions are not 
cleaned-up for workflows with the knative deploymentModel and gitops profile 
(#3049)
60e394abed1 is described below

commit 60e394abed13c34573110bdf6680ccffd25bcecd
Author: Walter Medvedeo <[email protected]>
AuthorDate: Tue Apr 8 08:41:08 2025 +0200

    [sonataflow-operator] Knative outdated revisions are not cleaned-up for 
workflows with the knative deploymentModel and gitops profile (#3049)
---
 .../internal/controller/knative/knative.go         | 85 +++++++++++++++++++++
 .../controller/profiles/gitops/states_gitops.go    |  8 +-
 .../controller/profiles/preview/states_preview.go  | 86 +---------------------
 3 files changed, 96 insertions(+), 83 deletions(-)

diff --git 
a/packages/sonataflow-operator/internal/controller/knative/knative.go 
b/packages/sonataflow-operator/internal/controller/knative/knative.go
index 1e004c9330b..492fdc8380b 100644
--- a/packages/sonataflow-operator/internal/controller/knative/knative.go
+++ b/packages/sonataflow-operator/internal/controller/knative/knative.go
@@ -22,8 +22,15 @@ package knative
 import (
        "context"
        "fmt"
+       "sort"
        "strings"
 
+       "k8s.io/apimachinery/pkg/labels"
+       "k8s.io/klog/v2"
+       servingv1 "knative.dev/serving/pkg/apis/serving/v1"
+
+       "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
+
        "knative.dev/pkg/resolver"
 
        "knative.dev/pkg/tracker"
@@ -71,6 +78,7 @@ const (
        knativeSinkProvided                      = "SinkProvided"
        KafkaKnativeEventingDeliveryOrder        = 
"kafka.eventing.knative.dev/delivery.order"
        KafkaKnativeEventingDeliveryOrderOrdered = "ordered"
+       workflowContainer                        = "workflow"
 )
 
 // noOpTracker no operations tracker for querying operations based on 
resolver.URIResolver, that don't require any
@@ -351,3 +359,80 @@ func GetSinkURI(destination duckv1.Destination) 
(*apis.URL, error) {
                return url, nil
        }
 }
+
+// CleanupOutdatedRevisions Given a deployed workflow, analyses if the 
configured deployment model is knative.
+// If that is the case, and the corresponding SinkBinding was created and 
properly injected, all the previous knative
+// service revisions that weren't properly initialized (i.e. doesn't have the 
K_SINK injected) will be cleaned-up.
+// Note that revisions in that situation are not valid, since workflows 
without the K_SINK injected will never pass
+// the health checks, etc.
+func CleanupOutdatedRevisions(ctx context.Context, cfg *rest.Config, workflow 
*operatorapi.SonataFlow) error {
+       if !workflow.IsKnativeDeployment() {
+               return nil
+       }
+       avail, err := GetKnativeAvailability(cfg)
+       if err != nil {
+               return err
+       }
+       if !avail.Serving || !avail.Eventing {
+               return nil
+       }
+       injected, err := CheckKSinkInjected(workflow.Name, workflow.Namespace)
+       if err != nil {
+               return err
+       }
+       if !injected {
+               return fmt.Errorf("waiting for Sinkbinding K_SINK injection to 
complete")
+       }
+       opts := &client.ListOptions{
+               LabelSelector: labels.SelectorFromSet(
+                       map[string]string{
+                               workflowproj.LabelWorkflow:          
workflow.Name,
+                               workflowproj.LabelWorkflowNamespace: 
workflow.Namespace,
+                       },
+               ),
+               Namespace: workflow.Namespace,
+       }
+       revisionList := &servingv1.RevisionList{}
+       if err := utils.GetClient().List(ctx, revisionList, opts); err != nil {
+               return err
+       }
+       // Sort the revisions based on creation timestamp
+       sortRevisions(revisionList.Items)
+       // Clean up previous revisions that do not have K_SINK injected
+       for i := 0; i < len(revisionList.Items)-1; i++ {
+               revision := &revisionList.Items[i]
+               if !containsKSink(revision) {
+                       klog.V(log.I).InfoS("Revision %s does not have K_SINK 
injected and can be cleaned up.", revision.Name)
+                       if err := utils.GetClient().Delete(ctx, revision, 
&client.DeleteOptions{}); err != nil {
+                               return err
+                       }
+               }
+       }
+       return nil
+}
+
+func containsKSink(revision *servingv1.Revision) bool {
+       for _, container := range revision.Spec.PodSpec.Containers {
+               if container.Name == workflowContainer {
+                       for _, env := range container.Env {
+                               if env.Name == kSink {
+                                       return true
+                               }
+                       }
+                       break
+               }
+       }
+       return false
+}
+
+type CreationTimestamp []servingv1.Revision
+
+func (a CreationTimestamp) Len() int { return len(a) }
+func (a CreationTimestamp) Less(i, j int) bool {
+       return a[i].CreationTimestamp.Before(&a[j].CreationTimestamp)
+}
+func (a CreationTimestamp) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
+
+func sortRevisions(revisions []servingv1.Revision) {
+       sort.Sort(CreationTimestamp(revisions))
+}
diff --git 
a/packages/sonataflow-operator/internal/controller/profiles/gitops/states_gitops.go
 
b/packages/sonataflow-operator/internal/controller/profiles/gitops/states_gitops.go
index 0a220d2d875..62e5a820a6e 100644
--- 
a/packages/sonataflow-operator/internal/controller/profiles/gitops/states_gitops.go
+++ 
b/packages/sonataflow-operator/internal/controller/profiles/gitops/states_gitops.go
@@ -19,6 +19,9 @@ package gitops
 
 import (
        "context"
+       "fmt"
+
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/knative"
 
        ctrl "sigs.k8s.io/controller-runtime"
        "sigs.k8s.io/controller-runtime/pkg/client"
@@ -69,6 +72,9 @@ func (f *followDeployWorkflowState) Do(ctx context.Context, 
workflow *operatorap
 }
 
 func (f *followDeployWorkflowState) PostReconcile(ctx context.Context, 
workflow *operatorapi.SonataFlow) error {
-       //By default, we don't want to perform anything after the 
reconciliation, and so we will simply return no error
+       // Clean up the outdated Knative revisions, if any
+       if err := knative.CleanupOutdatedRevisions(ctx, f.Cfg, workflow); err 
!= nil {
+               return fmt.Errorf("failied to cleanup workflow outdated 
revisions, workflow: %s, namespace: %s - %v", workflow.Name, 
workflow.Namespace, err)
+       }
        return nil
 }
diff --git 
a/packages/sonataflow-operator/internal/controller/profiles/preview/states_preview.go
 
b/packages/sonataflow-operator/internal/controller/profiles/preview/states_preview.go
index 55a50e5778b..e53bae1528e 100644
--- 
a/packages/sonataflow-operator/internal/controller/profiles/preview/states_preview.go
+++ 
b/packages/sonataflow-operator/internal/controller/profiles/preview/states_preview.go
@@ -22,12 +22,9 @@ package preview
 import (
        "context"
        "fmt"
-       "sort"
 
        corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/api/errors"
-       "k8s.io/apimachinery/pkg/labels"
-       servingv1 "knative.dev/serving/pkg/apis/serving/v1"
        ctrl "sigs.k8s.io/controller-runtime"
        "sigs.k8s.io/controller-runtime/pkg/client"
 
@@ -43,12 +40,6 @@ import (
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common"
        
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants"
        "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
-       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/workflowproj"
-)
-
-const (
-       kSink             = "K_SINK"
-       workflowContainer = "workflow"
 )
 
 type newBuilderState struct {
@@ -238,7 +229,10 @@ func (h *deployWithBuildWorkflowState) Do(ctx 
context.Context, workflow *operato
 
 func (h *deployWithBuildWorkflowState) PostReconcile(ctx context.Context, 
workflow *operatorapi.SonataFlow) error {
        // Clean up the outdated Knative revisions, if any
-       return h.cleanupOutdatedRevisions(ctx, workflow)
+       if err := knative.CleanupOutdatedRevisions(ctx, h.Cfg, workflow); err 
!= nil {
+               return fmt.Errorf("failied to cleanup workflow outdated 
revisions, workflow: %s, namespace: %s - %v", workflow.Name, 
workflow.Namespace, err)
+       }
+       return nil
 }
 
 // isWorkflowChanged checks whether the contents of .spec.flow of the given 
workflow has changed.
@@ -254,75 +248,3 @@ func (h *deployWithBuildWorkflowState) 
isWorkflowChanged(workflow *operatorapi.S
        }
        return actualCRC != workflow.Status.FlowCRC, nil
 }
-
-func (h *deployWithBuildWorkflowState) cleanupOutdatedRevisions(ctx 
context.Context, workflow *operatorapi.SonataFlow) error {
-       if !workflow.IsKnativeDeployment() {
-               return nil
-       }
-       avail, err := knative.GetKnativeAvailability(h.Cfg)
-       if err != nil {
-               return err
-       }
-       if !avail.Serving || !avail.Eventing {
-               return nil
-       }
-       injected, err := knative.CheckKSinkInjected(workflow.Name, 
workflow.Namespace)
-       if err != nil {
-               return err
-       }
-       if !injected {
-               return fmt.Errorf("waiting for Sinkbinding K_SINK injection to 
complete")
-       }
-       opts := &client.ListOptions{
-               LabelSelector: labels.SelectorFromSet(
-                       map[string]string{
-                               workflowproj.LabelWorkflow:          
workflow.Name,
-                               workflowproj.LabelWorkflowNamespace: 
workflow.Namespace,
-                       },
-               ),
-               Namespace: workflow.Namespace,
-       }
-       revisionList := &servingv1.RevisionList{}
-       if err := h.C.List(ctx, revisionList, opts); err != nil {
-               return err
-       }
-       // Sort the revisions based on creation timestamp
-       sortRevisions(revisionList.Items)
-       // Clean up previous revisions that do not have K_SINK injected
-       for i := 0; i < len(revisionList.Items)-1; i++ {
-               revision := &revisionList.Items[i]
-               if !containsKSink(revision) {
-                       klog.V(log.I).InfoS("Revision %s does not have K_SINK 
injected and can be cleaned up.", revision.Name)
-                       if err := h.C.Delete(ctx, revision, 
&client.DeleteOptions{}); err != nil {
-                               return err
-                       }
-               }
-       }
-       return nil
-}
-
-func containsKSink(revision *servingv1.Revision) bool {
-       for _, container := range revision.Spec.PodSpec.Containers {
-               if container.Name == workflowContainer {
-                       for _, env := range container.Env {
-                               if env.Name == kSink {
-                                       return true
-                               }
-                       }
-                       break
-               }
-       }
-       return false
-}
-
-type CreationTimestamp []servingv1.Revision
-
-func (a CreationTimestamp) Len() int { return len(a) }
-func (a CreationTimestamp) Less(i, j int) bool {
-       return a[i].CreationTimestamp.Before(&a[j].CreationTimestamp)
-}
-func (a CreationTimestamp) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
-
-func sortRevisions(revisions []servingv1.Revision) {
-       sort.Sort(CreationTimestamp(revisions))
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to