This is an automated email from the ASF dual-hosted git repository.
wmedvedeo pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-serverless-operator.git
The following commit(s) were added to refs/heads/main by this push:
new c4299537 Fix #560 - Check if the .spec.flow has changed before
building (#564)
c4299537 is described below
commit c429953742110b2002317306f94f943e9bc72fab
Author: Ricardo Zanini <[email protected]>
AuthorDate: Thu Nov 14 04:09:11 2024 -0500
Fix #560 - Check if the .spec.flow has changed before building (#564)
* Fix #560 - Check if the .spec.flow has changed before building
Signed-off-by: Ricardo Zanini <[email protected]>
* Use CRC32 instead of comparing flows directly
Signed-off-by: Ricardo Zanini <[email protected]>
---------
Signed-off-by: Ricardo Zanini <[email protected]>
---
api/v1alpha08/sonataflow_types.go | 2 +
bundle/manifests/sonataflow.org_sonataflows.yaml | 3 ++
config/crd/bases/sonataflow.org_sonataflows.yaml | 3 ++
.../sonataflow-operator.clusterserviceversion.yaml | 2 +
internal/controller/profiles/common/reconciler.go | 6 +++
.../profiles/preview/profile_preview_test.go | 13 +++---
.../controller/profiles/preview/states_preview.go | 25 ++++++----
.../profiles/preview/states_preview_test.go | 54 ++++++++++++++++++++++
operator.yaml | 3 ++
test/yaml.go | 3 ++
utils/crc.go | 33 +++++++++++++
utils/kubernetes/annotations.go | 22 ---------
12 files changed, 132 insertions(+), 37 deletions(-)
diff --git a/api/v1alpha08/sonataflow_types.go
b/api/v1alpha08/sonataflow_types.go
index 01452cc4..3371e516 100644
--- a/api/v1alpha08/sonataflow_types.go
+++ b/api/v1alpha08/sonataflow_types.go
@@ -200,6 +200,8 @@ type SonataFlowStatus struct {
// Triggers list of triggers created for the SonataFlow
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="triggers"
Triggers []SonataFlowTriggerRef `json:"triggers,omitempty"`
+
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="flowRevision"
+ FlowCRC uint32 `json:"flowCRC,omitempty"`
}
// SonataFlowTriggerRef defines a trigger created for the SonataFlow.
diff --git a/bundle/manifests/sonataflow.org_sonataflows.yaml
b/bundle/manifests/sonataflow.org_sonataflows.yaml
index 911f64e2..44d56859 100644
--- a/bundle/manifests/sonataflow.org_sonataflows.yaml
+++ b/bundle/manifests/sonataflow.org_sonataflows.yaml
@@ -10050,6 +10050,9 @@ spec:
endpoint:
description: Endpoint is an externally accessible URL of the
workflow
type: string
+ flowCRC:
+ format: int32
+ type: integer
lastTimeRecoverAttempt:
format: date-time
type: string
diff --git a/config/crd/bases/sonataflow.org_sonataflows.yaml
b/config/crd/bases/sonataflow.org_sonataflows.yaml
index aed84f8c..36be93c8 100644
--- a/config/crd/bases/sonataflow.org_sonataflows.yaml
+++ b/config/crd/bases/sonataflow.org_sonataflows.yaml
@@ -10050,6 +10050,9 @@ spec:
endpoint:
description: Endpoint is an externally accessible URL of the
workflow
type: string
+ flowCRC:
+ format: int32
+ type: integer
lastTimeRecoverAttempt:
format: date-time
type: string
diff --git
a/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
b/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
index df687131..0aea8f40 100644
--- a/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
+++ b/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml
@@ -239,6 +239,8 @@ spec:
- description: Endpoint is an externally accessible URL of the workflow
displayName: endpoint
path: endpoint
+ - displayName: flowRevision
+ path: flowCRC
- displayName: lastTimeRecoverAttempt
path: lastTimeRecoverAttempt
- description: Platform displays which platform is being used by this
workflow
diff --git a/internal/controller/profiles/common/reconciler.go
b/internal/controller/profiles/common/reconciler.go
index dba18439..82637c4d 100644
--- a/internal/controller/profiles/common/reconciler.go
+++ b/internal/controller/profiles/common/reconciler.go
@@ -23,6 +23,8 @@ import (
"context"
"fmt"
+ "github.com/apache/incubator-kie-kogito-serverless-operator/utils"
+
"k8s.io/client-go/rest"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/discovery"
@@ -56,6 +58,10 @@ func (s *StateSupport) PerformStatusUpdate(ctx
context.Context, workflow *operat
return false, err
}
workflow.Status.ObservedGeneration = workflow.Generation
+ workflow.Status.FlowCRC, err = utils.Crc32Checksum(workflow.Spec.Flow)
+ if err != nil {
+ return false, err
+ }
services.SetServiceUrlsInWorkflowStatus(pl, workflow)
if workflow.Status.Platform == nil {
workflow.Status.Platform = &operatorapi.SonataFlowPlatformRef{}
diff --git a/internal/controller/profiles/preview/profile_preview_test.go
b/internal/controller/profiles/preview/profile_preview_test.go
index aed417bd..99c1677b 100644
--- a/internal/controller/profiles/preview/profile_preview_test.go
+++ b/internal/controller/profiles/preview/profile_preview_test.go
@@ -181,7 +181,7 @@ func
Test_deployWorkflowReconciliationHandler_handleObjects(t *testing.T) {
assert.Equal(t, serviceMonitor.Spec.Endpoints[0].Path, "/q/metrics")
}
-func Test_GenerationAnnotationCheck(t *testing.T) {
+func Test_WorkflowChangedCheck(t *testing.T) {
// we load a workflow with metadata.generation to 0
workflow := test.GetBaseSonataFlow(t.Name())
platform := test.GetBasePlatformInReadyPhase(t.Name())
@@ -199,15 +199,14 @@ func Test_GenerationAnnotationCheck(t *testing.T) {
assert.NotNil(t, result)
assert.Len(t, objects, 3)
- // then we load a workflow with metadata.generation set to 1
+ // then we load the current workflow
workflowChanged := &operatorapi.SonataFlow{}
err = client.Get(context.TODO(),
clientruntime.ObjectKeyFromObject(workflow), workflowChanged)
assert.NoError(t, err)
- //we set the generation to 1
- workflowChanged.Generation = int64(1)
- err = client.Update(context.TODO(), workflowChanged)
- assert.NoError(t, err)
- // reconcile
+ //we change something within the flow
+ workflowChanged.Spec.Flow.AutoRetries = true
+
+ // reconcile -> the one in the k8s DB is different, so there's a change.
handler = &deployWithBuildWorkflowState{
StateSupport: fakeReconcilerSupport(client),
ensurers: NewObjectEnsurers(&common.StateSupport{C:
client}),
diff --git a/internal/controller/profiles/preview/states_preview.go
b/internal/controller/profiles/preview/states_preview.go
index c5480d44..88031a7a 100644
--- a/internal/controller/profiles/preview/states_preview.go
+++ b/internal/controller/profiles/preview/states_preview.go
@@ -24,6 +24,7 @@ import (
"fmt"
"sort"
+ "github.com/apache/incubator-kie-kogito-serverless-operator/utils"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
@@ -40,7 +41,6 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common/constants"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
- kubeutil
"github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
"github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj"
)
@@ -209,7 +209,11 @@ func (h *deployWithBuildWorkflowState) Do(ctx
context.Context, workflow *operato
return ctrl.Result{}, nil, err
}
- if h.isWorkflowChanged(workflow) { // Let's check that the 2
resWorkflowDef definition are different
+ hasChanged, err := h.isWorkflowChanged(workflow)
+ if err != nil {
+ return ctrl.Result{}, nil, err
+ }
+ if hasChanged { // Let's check that the 2 resWorkflowDef definition are
different
if err = buildManager.MarkToRestart(build); err != nil {
return ctrl.Result{}, nil, err
}
@@ -235,13 +239,18 @@ func (h *deployWithBuildWorkflowState) PostReconcile(ctx
context.Context, workfl
return h.cleanupOutdatedRevisions(ctx, workflow)
}
-// isWorkflowChanged marks the workflow status as unknown to require a new
build reconciliation
-func (h *deployWithBuildWorkflowState) isWorkflowChanged(workflow
*operatorapi.SonataFlow) bool {
- generation := kubeutil.GetLastGeneration(workflow.Namespace,
workflow.Name, h.C, context.TODO())
- if generation > workflow.Status.ObservedGeneration {
- return true
+// isWorkflowChanged checks whether the contents of .spec.flow of the given
workflow has changed.
+func (h *deployWithBuildWorkflowState) isWorkflowChanged(workflow
*operatorapi.SonataFlow) (bool, error) {
+ // Added this guard for backward compatibility for workflows deployed
with a previous operator version, so we won't kick thousands of builds on
users' cluster.
+ // After this reconciliation cycle, the CRC should be updated
+ if workflow.Status.FlowCRC == 0 {
+ return false, nil
}
- return false
+ actualCRC, err := utils.Crc32Checksum(workflow.Spec.Flow)
+ if err != nil {
+ return false, err
+ }
+ return actualCRC != workflow.Status.FlowCRC, nil
}
func (h *deployWithBuildWorkflowState) cleanupOutdatedRevisions(ctx
context.Context, workflow *operatorapi.SonataFlow) error {
diff --git a/internal/controller/profiles/preview/states_preview_test.go
b/internal/controller/profiles/preview/states_preview_test.go
new file mode 100644
index 00000000..43a26de9
--- /dev/null
+++ b/internal/controller/profiles/preview/states_preview_test.go
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package preview
+
+import (
+ "testing"
+
+ "github.com/apache/incubator-kie-kogito-serverless-operator/utils"
+
+
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common"
+ "github.com/apache/incubator-kie-kogito-serverless-operator/test"
+ "github.com/serverlessworkflow/sdk-go/v2/model"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_deployWithBuildWorkflowState_isWorkflowChanged(t *testing.T) {
+ workflow1 := test.GetBaseSonataFlow(t.Name())
+ workflow2 := test.GetBaseSonataFlow(t.Name())
+ workflow1.Status.FlowCRC, _ = utils.Crc32Checksum(workflow1.Spec.Flow)
+ workflow2.Status.FlowCRC, _ = utils.Crc32Checksum(workflow2.Spec.Flow)
+ deployWithBuildWorkflowState := &deployWithBuildWorkflowState{
+ StateSupport: &common.StateSupport{C:
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow1).Build()},
+ }
+
+ hasChanged, err :=
deployWithBuildWorkflowState.isWorkflowChanged(workflow2)
+ assert.NoError(t, err)
+ assert.False(t, hasChanged)
+
+ // change workflow2
+ workflow2.Spec.Flow.Metadata = model.Metadata{
+ "string": model.Object{
+ StringValue: "test",
+ },
+ }
+
+ hasChanged, err =
deployWithBuildWorkflowState.isWorkflowChanged(workflow2)
+ assert.NoError(t, err)
+ assert.True(t, hasChanged)
+}
diff --git a/operator.yaml b/operator.yaml
index b45f2fe7..88fa1682 100644
--- a/operator.yaml
+++ b/operator.yaml
@@ -27477,6 +27477,9 @@ spec:
endpoint:
description: Endpoint is an externally accessible URL of the
workflow
type: string
+ flowCRC:
+ format: int32
+ type: integer
lastTimeRecoverAttempt:
format: date-time
type: string
diff --git a/test/yaml.go b/test/yaml.go
index 752f0562..6f2ffa95 100644
--- a/test/yaml.go
+++ b/test/yaml.go
@@ -27,6 +27,8 @@ import (
"runtime"
"strings"
+ "github.com/apache/incubator-kie-kogito-serverless-operator/utils"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/api"
operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
@@ -71,6 +73,7 @@ func GetSonataFlow(testFile, namespace string)
*operatorapi.SonataFlow {
GetKubernetesResource(testFile, ksw)
klog.V(log.D).InfoS("Successfully read KSW", "ksw", spew.Sprint(ksw))
ksw.Namespace = namespace
+ ksw.Status.FlowCRC, _ = utils.Crc32Checksum(ksw.Spec.Flow)
return ksw
}
diff --git a/utils/crc.go b/utils/crc.go
new file mode 100644
index 00000000..a262f3ee
--- /dev/null
+++ b/utils/crc.go
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package utils
+
+import (
+ "bytes"
+ "encoding/gob"
+ "hash/crc32"
+)
+
+func Crc32Checksum(v interface{}) (uint32, error) {
+ var buf bytes.Buffer
+ enc := gob.NewEncoder(&buf)
+ if err := enc.Encode(v); err != nil {
+ return 0, err
+ }
+ return crc32.ChecksumIEEE(buf.Bytes()), nil
+}
diff --git a/utils/kubernetes/annotations.go b/utils/kubernetes/annotations.go
index 24d9ba4a..91d4927a 100644
--- a/utils/kubernetes/annotations.go
+++ b/utils/kubernetes/annotations.go
@@ -20,33 +20,11 @@
package kubernetes
import (
- "context"
"strconv"
- "k8s.io/klog/v2"
-
"sigs.k8s.io/controller-runtime/pkg/client"
-
- operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
- "github.com/apache/incubator-kie-kogito-serverless-operator/log"
)
-func getWorkflow(namespace string, name string, c client.Client, ctx
context.Context) *operatorapi.SonataFlow {
- serverlessWorkflowType := &operatorapi.SonataFlow{}
- serverlessWorkflowType.Namespace = namespace
- serverlessWorkflowType.Name = name
- serverlessWorkflow := &operatorapi.SonataFlow{}
- if err := c.Get(ctx,
client.ObjectKeyFromObject(serverlessWorkflowType), serverlessWorkflow); err !=
nil {
- klog.V(log.E).ErrorS(err, "unable to retrieve SonataFlow
definition")
- }
- return serverlessWorkflow
-}
-
-func GetLastGeneration(namespace string, name string, c client.Client, ctx
context.Context) int64 {
- workflow := getWorkflow(namespace, name, c, ctx)
- return workflow.Generation
-}
-
// GetAnnotationAsBool returns the boolean value from the given annotation.
// If the annotation is not present or is there an error in the ParseBool
conversion, returns false.
func GetAnnotationAsBool(object client.Object, key string) bool {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]