This is an automated email from the ASF dual-hosted git repository.
wmedvedeo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-tools.git
The following commit(s) were added to refs/heads/main by this push:
new 60e394abed1 [sonataflow-operator] Knative outdated revisions are not
cleaned-up for workflows with the knative deploymentModel and gitops profile
(#3049)
60e394abed1 is described below
commit 60e394abed13c34573110bdf6680ccffd25bcecd
Author: Walter Medvedeo <[email protected]>
AuthorDate: Tue Apr 8 08:41:08 2025 +0200
[sonataflow-operator] Knative outdated revisions are not cleaned-up for
workflows with the knative deploymentModel and gitops profile (#3049)
---
.../internal/controller/knative/knative.go | 85 +++++++++++++++++++++
.../controller/profiles/gitops/states_gitops.go | 8 +-
.../controller/profiles/preview/states_preview.go | 86 +---------------------
3 files changed, 96 insertions(+), 83 deletions(-)
diff --git
a/packages/sonataflow-operator/internal/controller/knative/knative.go
b/packages/sonataflow-operator/internal/controller/knative/knative.go
index 1e004c9330b..492fdc8380b 100644
--- a/packages/sonataflow-operator/internal/controller/knative/knative.go
+++ b/packages/sonataflow-operator/internal/controller/knative/knative.go
@@ -22,8 +22,15 @@ package knative
import (
"context"
"fmt"
+ "sort"
"strings"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/klog/v2"
+ servingv1 "knative.dev/serving/pkg/apis/serving/v1"
+
+ "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
+
"knative.dev/pkg/resolver"
"knative.dev/pkg/tracker"
@@ -71,6 +78,7 @@ const (
knativeSinkProvided = "SinkProvided"
KafkaKnativeEventingDeliveryOrder =
"kafka.eventing.knative.dev/delivery.order"
KafkaKnativeEventingDeliveryOrderOrdered = "ordered"
+ workflowContainer = "workflow"
)
// noOpTracker no operations tracker for querying operations based on
resolver.URIResolver, that don't require any
@@ -351,3 +359,80 @@ func GetSinkURI(destination duckv1.Destination)
(*apis.URL, error) {
return url, nil
}
}
+
+// CleanupOutdatedRevisions Given a deployed workflow, analyses if the
configured deployment model is knative.
+// If that is the case, and the corresponding SinkBinding was created and
properly injected, all the previous knative
+// service revisions that weren't properly initialized (i.e. doesn't have the
K_SINK injected) will be cleaned-up.
+// Note that revisions in that situation are not valid, since workflows
without the K_SINK injected will never pass
+// the health checks, etc.
+func CleanupOutdatedRevisions(ctx context.Context, cfg *rest.Config, workflow
*operatorapi.SonataFlow) error {
+ if !workflow.IsKnativeDeployment() {
+ return nil
+ }
+ avail, err := GetKnativeAvailability(cfg)
+ if err != nil {
+ return err
+ }
+ if !avail.Serving || !avail.Eventing {
+ return nil
+ }
+ injected, err := CheckKSinkInjected(workflow.Name, workflow.Namespace)
+ if err != nil {
+ return err
+ }
+ if !injected {
+ return fmt.Errorf("waiting for Sinkbinding K_SINK injection to
complete")
+ }
+ opts := &client.ListOptions{
+ LabelSelector: labels.SelectorFromSet(
+ map[string]string{
+ workflowproj.LabelWorkflow:
workflow.Name,
+ workflowproj.LabelWorkflowNamespace:
workflow.Namespace,
+ },
+ ),
+ Namespace: workflow.Namespace,
+ }
+ revisionList := &servingv1.RevisionList{}
+ if err := utils.GetClient().List(ctx, revisionList, opts); err != nil {
+ return err
+ }
+ // Sort the revisions based on creation timestamp
+ sortRevisions(revisionList.Items)
+ // Clean up previous revisions that do not have K_SINK injected
+ for i := 0; i < len(revisionList.Items)-1; i++ {
+ revision := &revisionList.Items[i]
+ if !containsKSink(revision) {
+ klog.V(log.I).InfoS("Revision %s does not have K_SINK
injected and can be cleaned up.", revision.Name)
+ if err := utils.GetClient().Delete(ctx, revision,
&client.DeleteOptions{}); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func containsKSink(revision *servingv1.Revision) bool {
+ for _, container := range revision.Spec.PodSpec.Containers {
+ if container.Name == workflowContainer {
+ for _, env := range container.Env {
+ if env.Name == kSink {
+ return true
+ }
+ }
+ break
+ }
+ }
+ return false
+}
+
+type CreationTimestamp []servingv1.Revision
+
+func (a CreationTimestamp) Len() int { return len(a) }
+func (a CreationTimestamp) Less(i, j int) bool {
+ return a[i].CreationTimestamp.Before(&a[j].CreationTimestamp)
+}
+func (a CreationTimestamp) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
+
+func sortRevisions(revisions []servingv1.Revision) {
+ sort.Sort(CreationTimestamp(revisions))
+}
diff --git
a/packages/sonataflow-operator/internal/controller/profiles/gitops/states_gitops.go
b/packages/sonataflow-operator/internal/controller/profiles/gitops/states_gitops.go
index 0a220d2d875..62e5a820a6e 100644
---
a/packages/sonataflow-operator/internal/controller/profiles/gitops/states_gitops.go
+++
b/packages/sonataflow-operator/internal/controller/profiles/gitops/states_gitops.go
@@ -19,6 +19,9 @@ package gitops
import (
"context"
+ "fmt"
+
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/knative"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -69,6 +72,9 @@ func (f *followDeployWorkflowState) Do(ctx context.Context,
workflow *operatorap
}
func (f *followDeployWorkflowState) PostReconcile(ctx context.Context,
workflow *operatorapi.SonataFlow) error {
- //By default, we don't want to perform anything after the
reconciliation, and so we will simply return no error
+ // Clean up the outdated Knative revisions, if any
+ if err := knative.CleanupOutdatedRevisions(ctx, f.Cfg, workflow); err
!= nil {
+ return fmt.Errorf("failied to cleanup workflow outdated
revisions, workflow: %s, namespace: %s - %v", workflow.Name,
workflow.Namespace, err)
+ }
return nil
}
diff --git
a/packages/sonataflow-operator/internal/controller/profiles/preview/states_preview.go
b/packages/sonataflow-operator/internal/controller/profiles/preview/states_preview.go
index 55a50e5778b..e53bae1528e 100644
---
a/packages/sonataflow-operator/internal/controller/profiles/preview/states_preview.go
+++
b/packages/sonataflow-operator/internal/controller/profiles/preview/states_preview.go
@@ -22,12 +22,9 @@ package preview
import (
"context"
"fmt"
- "sort"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
- "k8s.io/apimachinery/pkg/labels"
- servingv1 "knative.dev/serving/pkg/apis/serving/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -43,12 +40,6 @@ import (
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
-
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/workflowproj"
-)
-
-const (
- kSink = "K_SINK"
- workflowContainer = "workflow"
)
type newBuilderState struct {
@@ -238,7 +229,10 @@ func (h *deployWithBuildWorkflowState) Do(ctx
context.Context, workflow *operato
func (h *deployWithBuildWorkflowState) PostReconcile(ctx context.Context,
workflow *operatorapi.SonataFlow) error {
// Clean up the outdated Knative revisions, if any
- return h.cleanupOutdatedRevisions(ctx, workflow)
+ if err := knative.CleanupOutdatedRevisions(ctx, h.Cfg, workflow); err
!= nil {
+ return fmt.Errorf("failied to cleanup workflow outdated
revisions, workflow: %s, namespace: %s - %v", workflow.Name,
workflow.Namespace, err)
+ }
+ return nil
}
// isWorkflowChanged checks whether the contents of .spec.flow of the given
workflow has changed.
@@ -254,75 +248,3 @@ func (h *deployWithBuildWorkflowState)
isWorkflowChanged(workflow *operatorapi.S
}
return actualCRC != workflow.Status.FlowCRC, nil
}
-
-func (h *deployWithBuildWorkflowState) cleanupOutdatedRevisions(ctx
context.Context, workflow *operatorapi.SonataFlow) error {
- if !workflow.IsKnativeDeployment() {
- return nil
- }
- avail, err := knative.GetKnativeAvailability(h.Cfg)
- if err != nil {
- return err
- }
- if !avail.Serving || !avail.Eventing {
- return nil
- }
- injected, err := knative.CheckKSinkInjected(workflow.Name,
workflow.Namespace)
- if err != nil {
- return err
- }
- if !injected {
- return fmt.Errorf("waiting for Sinkbinding K_SINK injection to
complete")
- }
- opts := &client.ListOptions{
- LabelSelector: labels.SelectorFromSet(
- map[string]string{
- workflowproj.LabelWorkflow:
workflow.Name,
- workflowproj.LabelWorkflowNamespace:
workflow.Namespace,
- },
- ),
- Namespace: workflow.Namespace,
- }
- revisionList := &servingv1.RevisionList{}
- if err := h.C.List(ctx, revisionList, opts); err != nil {
- return err
- }
- // Sort the revisions based on creation timestamp
- sortRevisions(revisionList.Items)
- // Clean up previous revisions that do not have K_SINK injected
- for i := 0; i < len(revisionList.Items)-1; i++ {
- revision := &revisionList.Items[i]
- if !containsKSink(revision) {
- klog.V(log.I).InfoS("Revision %s does not have K_SINK
injected and can be cleaned up.", revision.Name)
- if err := h.C.Delete(ctx, revision,
&client.DeleteOptions{}); err != nil {
- return err
- }
- }
- }
- return nil
-}
-
-func containsKSink(revision *servingv1.Revision) bool {
- for _, container := range revision.Spec.PodSpec.Containers {
- if container.Name == workflowContainer {
- for _, env := range container.Env {
- if env.Name == kSink {
- return true
- }
- }
- break
- }
- }
- return false
-}
-
-type CreationTimestamp []servingv1.Revision
-
-func (a CreationTimestamp) Len() int { return len(a) }
-func (a CreationTimestamp) Less(i, j int) bool {
- return a[i].CreationTimestamp.Before(&a[j].CreationTimestamp)
-}
-func (a CreationTimestamp) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
-
-func sortRevisions(revisions []servingv1.Revision) {
- sort.Sort(CreationTimestamp(revisions))
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]