This is an automated email from the ASF dual-hosted git repository.
ricardozanini pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-serverless-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 5dae96cd [issue-562] Serverless workflow pod gets restarted repeatedly
after Knative K_SINK injection (#565)
5dae96cd is described below
commit 5dae96cdccb8a7c294e0adbfcae76ba37c421618
Author: Jianrong Zhang <[email protected]>
AuthorDate: Fri Nov 8 11:33:27 2024 -0500
[issue-562] Serverless workflow pod gets restarted repeatedly after Knative
K_SINK injection (#565)
---
config/rbac/role.yaml | 8 ++
.../controller/profiles/common/knative_eventing.go | 2 +-
.../controller/profiles/preview/states_preview.go | 86 +++++++++++++++++++++-
internal/controller/sonataflow_controller.go | 1 +
operator.yaml | 8 ++
5 files changed, 102 insertions(+), 3 deletions(-)
diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml
index ecba276f..bfeba82c 100644
--- a/config/rbac/role.yaml
+++ b/config/rbac/role.yaml
@@ -32,6 +32,14 @@ rules:
- list
- update
- watch
+- apiGroups:
+ - serving.knative.dev
+ resources:
+ - revisions
+ verbs:
+ - delete
+ - list
+ - watch
- apiGroups:
- sonataflow.org
resources:
diff --git a/internal/controller/profiles/common/knative_eventing.go
b/internal/controller/profiles/common/knative_eventing.go
index 7838138d..dcb1faad 100644
--- a/internal/controller/profiles/common/knative_eventing.go
+++ b/internal/controller/profiles/common/knative_eventing.go
@@ -20,9 +20,9 @@ package common
import (
"context"
+ operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"k8s.io/klog/v2"
- operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
"sigs.k8s.io/controller-runtime/pkg/client"
diff --git a/internal/controller/profiles/preview/states_preview.go
b/internal/controller/profiles/preview/states_preview.go
index c108ee82..c5480d44 100644
--- a/internal/controller/profiles/preview/states_preview.go
+++ b/internal/controller/profiles/preview/states_preview.go
@@ -22,21 +22,31 @@ package preview
import (
"context"
"fmt"
+ "sort"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
+ servingv1 "knative.dev/serving/pkg/apis/serving/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/apache/incubator-kie-kogito-serverless-operator/api"
operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/builder"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/platform"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common/constants"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
kubeutil
"github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj"
+)
+
+const (
+ kSink = "K_SINK"
+ workflowContainer = "workflow"
)
type newBuilderState struct {
@@ -221,8 +231,8 @@ func (h *deployWithBuildWorkflowState) Do(ctx
context.Context, workflow *operato
}
func (h *deployWithBuildWorkflowState) PostReconcile(ctx context.Context,
workflow *operatorapi.SonataFlow) error {
- //By default, we don't want to perform anything after the
reconciliation, and so we will simply return no error
- return nil
+ // Clean up the outdated Knative revisions, if any
+ return h.cleanupOutdatedRevisions(ctx, workflow)
}
// isWorkflowChanged marks the workflow status as unknown to require a new
build reconciliation
@@ -233,3 +243,75 @@ func (h *deployWithBuildWorkflowState)
isWorkflowChanged(workflow *operatorapi.S
}
return false
}
+
+func (h *deployWithBuildWorkflowState) cleanupOutdatedRevisions(ctx
context.Context, workflow *operatorapi.SonataFlow) error {
+ if !workflow.IsKnativeDeployment() {
+ return nil
+ }
+ avail, err := knative.GetKnativeAvailability(h.Cfg)
+ if err != nil {
+ return err
+ }
+ if !avail.Serving || !avail.Eventing {
+ return nil
+ }
+ injected, err := knative.CheckKSinkInjected(workflow.Name,
workflow.Namespace)
+ if err != nil {
+ return err
+ }
+ if !injected {
+ return fmt.Errorf("waiting for Sinkbinding K_SINK injection to
complete")
+ }
+ opts := &client.ListOptions{
+ LabelSelector: labels.SelectorFromSet(
+ map[string]string{
+ workflowproj.LabelWorkflow:
workflow.Name,
+ workflowproj.LabelWorkflowNamespace:
workflow.Namespace,
+ },
+ ),
+ Namespace: workflow.Namespace,
+ }
+ revisionList := &servingv1.RevisionList{}
+ if err := h.C.List(ctx, revisionList, opts); err != nil {
+ return err
+ }
+ // Sort the revisions based on creation timestamp
+ sortRevisions(revisionList.Items)
+ // Clean up previous revisions that do not have K_SINK injected
+ for i := 0; i < len(revisionList.Items)-1; i++ {
+ revision := &revisionList.Items[i]
+ if !containsKSink(revision) {
+ klog.V(log.I).InfoS("Revision %s does not have K_SINK
injected and can be cleaned up.", revision.Name)
+ if err := h.C.Delete(ctx, revision,
&client.DeleteOptions{}); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func containsKSink(revision *servingv1.Revision) bool {
+ for _, container := range revision.Spec.PodSpec.Containers {
+ if container.Name == workflowContainer {
+ for _, env := range container.Env {
+ if env.Name == kSink {
+ return true
+ }
+ }
+ break
+ }
+ }
+ return false
+}
+
+type CreationTimestamp []servingv1.Revision
+
+func (a CreationTimestamp) Len() int { return len(a) }
+func (a CreationTimestamp) Less(i, j int) bool {
+ return a[i].CreationTimestamp.Before(&a[j].CreationTimestamp)
+}
+func (a CreationTimestamp) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
+
+func sortRevisions(revisions []servingv1.Revision) {
+ sort.Sort(CreationTimestamp(revisions))
+}
diff --git a/internal/controller/sonataflow_controller.go
b/internal/controller/sonataflow_controller.go
index 7724a637..2fbc8b33 100644
--- a/internal/controller/sonataflow_controller.go
+++ b/internal/controller/sonataflow_controller.go
@@ -71,6 +71,7 @@ type SonataFlowReconciler struct {
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows/finalizers,verbs=update
//+kubebuilder:rbac:groups="monitoring.coreos.com",resources=servicemonitors,verbs=get;list;watch;create;update;delete
+//+kubebuilder:rbac:groups="serving.knative.dev",resources=revisions,verbs=list;watch;delete
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
diff --git a/operator.yaml b/operator.yaml
index 977f7904..b45f2fe7 100644
--- a/operator.yaml
+++ b/operator.yaml
@@ -27746,6 +27746,14 @@ rules:
- list
- update
- watch
+- apiGroups:
+ - serving.knative.dev
+ resources:
+ - revisions
+ verbs:
+ - delete
+ - list
+ - watch
- apiGroups:
- sonataflow.org
resources:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]