This is an automated email from the ASF dual-hosted git repository.

ricardozanini pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-serverless-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 5dae96cd [issue-562] Serverless workflow pod gets restarted repeatedly 
after Knative K_SINK injection (#565)
5dae96cd is described below

commit 5dae96cdccb8a7c294e0adbfcae76ba37c421618
Author: Jianrong Zhang <[email protected]>
AuthorDate: Fri Nov 8 11:33:27 2024 -0500

    [issue-562] Serverless workflow pod gets restarted repeatedly after Knative 
K_SINK injection (#565)
---
 config/rbac/role.yaml                              |  8 ++
 .../controller/profiles/common/knative_eventing.go |  2 +-
 .../controller/profiles/preview/states_preview.go  | 86 +++++++++++++++++++++-
 internal/controller/sonataflow_controller.go       |  1 +
 operator.yaml                                      |  8 ++
 5 files changed, 102 insertions(+), 3 deletions(-)

diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index ecba276f..bfeba82c 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -32,6 +32,14 @@ rules:
   - list
   - update
   - watch
+- apiGroups:
+  - serving.knative.dev
+  resources:
+  - revisions
+  verbs:
+  - delete
+  - list
+  - watch
 - apiGroups:
   - sonataflow.org
   resources:
diff --git a/internal/controller/profiles/common/knative_eventing.go 
b/internal/controller/profiles/common/knative_eventing.go
index 7838138d..dcb1faad 100644
--- a/internal/controller/profiles/common/knative_eventing.go
+++ b/internal/controller/profiles/common/knative_eventing.go
@@ -20,9 +20,9 @@ package common
 import (
        "context"
 
+       operatorapi 
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
        "k8s.io/klog/v2"
 
-       operatorapi 
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
        
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative"
        "github.com/apache/incubator-kie-kogito-serverless-operator/log"
        "sigs.k8s.io/controller-runtime/pkg/client"
diff --git a/internal/controller/profiles/preview/states_preview.go 
b/internal/controller/profiles/preview/states_preview.go
index c108ee82..c5480d44 100644
--- a/internal/controller/profiles/preview/states_preview.go
+++ b/internal/controller/profiles/preview/states_preview.go
@@ -22,21 +22,31 @@ package preview
 import (
        "context"
        "fmt"
+       "sort"
 
        corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/api/errors"
+       "k8s.io/apimachinery/pkg/labels"
        "k8s.io/klog/v2"
+       servingv1 "knative.dev/serving/pkg/apis/serving/v1"
        ctrl "sigs.k8s.io/controller-runtime"
        "sigs.k8s.io/controller-runtime/pkg/client"
 
        "github.com/apache/incubator-kie-kogito-serverless-operator/api"
        operatorapi 
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
        
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/builder"
+       
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative"
        
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/platform"
        
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common"
        
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common/constants"
        "github.com/apache/incubator-kie-kogito-serverless-operator/log"
        kubeutil 
"github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
+       
"github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj"
+)
+
+const (
+       kSink             = "K_SINK"
+       workflowContainer = "workflow"
 )
 
 type newBuilderState struct {
@@ -221,8 +231,8 @@ func (h *deployWithBuildWorkflowState) Do(ctx 
context.Context, workflow *operato
 }
 
 func (h *deployWithBuildWorkflowState) PostReconcile(ctx context.Context, 
workflow *operatorapi.SonataFlow) error {
-       //By default, we don't want to perform anything after the 
reconciliation, and so we will simply return no error
-       return nil
+       // Clean up the outdated Knative revisions, if any
+       return h.cleanupOutdatedRevisions(ctx, workflow)
 }
 
 // isWorkflowChanged marks the workflow status as unknown to require a new 
build reconciliation
@@ -233,3 +243,75 @@ func (h *deployWithBuildWorkflowState) 
isWorkflowChanged(workflow *operatorapi.S
        }
        return false
 }
+
+func (h *deployWithBuildWorkflowState) cleanupOutdatedRevisions(ctx 
context.Context, workflow *operatorapi.SonataFlow) error {
+       if !workflow.IsKnativeDeployment() {
+               return nil
+       }
+       avail, err := knative.GetKnativeAvailability(h.Cfg)
+       if err != nil {
+               return err
+       }
+       if !avail.Serving || !avail.Eventing {
+               return nil
+       }
+       injected, err := knative.CheckKSinkInjected(workflow.Name, 
workflow.Namespace)
+       if err != nil {
+               return err
+       }
+       if !injected {
+               return fmt.Errorf("waiting for Sinkbinding K_SINK injection to 
complete")
+       }
+       opts := &client.ListOptions{
+               LabelSelector: labels.SelectorFromSet(
+                       map[string]string{
+                               workflowproj.LabelWorkflow:          
workflow.Name,
+                               workflowproj.LabelWorkflowNamespace: 
workflow.Namespace,
+                       },
+               ),
+               Namespace: workflow.Namespace,
+       }
+       revisionList := &servingv1.RevisionList{}
+       if err := h.C.List(ctx, revisionList, opts); err != nil {
+               return err
+       }
+       // Sort the revisions based on creation timestamp
+       sortRevisions(revisionList.Items)
+       // Clean up previous revisions that do not have K_SINK injected
+       for i := 0; i < len(revisionList.Items)-1; i++ {
+               revision := &revisionList.Items[i]
+               if !containsKSink(revision) {
+                       klog.V(log.I).InfoS("Revision %s does not have K_SINK 
injected and can be cleaned up.", revision.Name)
+                       if err := h.C.Delete(ctx, revision, 
&client.DeleteOptions{}); err != nil {
+                               return err
+                       }
+               }
+       }
+       return nil
+}
+
+func containsKSink(revision *servingv1.Revision) bool {
+       for _, container := range revision.Spec.PodSpec.Containers {
+               if container.Name == workflowContainer {
+                       for _, env := range container.Env {
+                               if env.Name == kSink {
+                                       return true
+                               }
+                       }
+                       break
+               }
+       }
+       return false
+}
+
+type CreationTimestamp []servingv1.Revision
+
+func (a CreationTimestamp) Len() int { return len(a) }
+func (a CreationTimestamp) Less(i, j int) bool {
+       return a[i].CreationTimestamp.Before(&a[j].CreationTimestamp)
+}
+func (a CreationTimestamp) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
+
+func sortRevisions(revisions []servingv1.Revision) {
+       sort.Sort(CreationTimestamp(revisions))
+}
diff --git a/internal/controller/sonataflow_controller.go 
b/internal/controller/sonataflow_controller.go
index 7724a637..2fbc8b33 100644
--- a/internal/controller/sonataflow_controller.go
+++ b/internal/controller/sonataflow_controller.go
@@ -71,6 +71,7 @@ type SonataFlowReconciler struct {
 
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows/status,verbs=get;update;patch
 
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows/finalizers,verbs=update
 
//+kubebuilder:rbac:groups="monitoring.coreos.com",resources=servicemonitors,verbs=get;list;watch;create;update;delete
+//+kubebuilder:rbac:groups="serving.knative.dev",resources=revisions,verbs=list;watch;delete
 
 // Reconcile is part of the main kubernetes reconciliation loop which aims to
 // move the current state of the cluster closer to the desired state.
diff --git a/operator.yaml b/operator.yaml
index 977f7904..b45f2fe7 100644
--- a/operator.yaml
+++ b/operator.yaml
@@ -27746,6 +27746,14 @@ rules:
   - list
   - update
   - watch
+- apiGroups:
+  - serving.knative.dev
+  resources:
+  - revisions
+  verbs:
+  - delete
+  - list
+  - watch
 - apiGroups:
   - sonataflow.org
   resources:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to