This is an automated email from the ASF dual-hosted git repository.
ricardozanini pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-serverless-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 08c8854e [KOGITO-8792] - Add events to build controllers and the
ability to restart a build (#318)
08c8854e is described below
commit 08c8854e8b3d8220043b7dce4caadfb88dfeb814
Author: Ricardo Zanini <[email protected]>
AuthorDate: Wed Dec 20 16:07:54 2023 -0300
[KOGITO-8792] - Add events to build controllers and the ability to restart
a build (#318)
* [KOGITO-8792] - Add events to build controllers and the ability to
restart a build
Signed-off-by: Ricardo Zanini <[email protected]>
* Add the ability to restart builds and signal to workflows
Signed-off-by: Ricardo Zanini <[email protected]>
* Rollout deployment after a successful build
Signed-off-by: Ricardo Zanini <[email protected]>
* Fix rollout deployment once a build finishes
Signed-off-by: Ricardo Zanini <[email protected]>
---------
Signed-off-by: Ricardo Zanini <[email protected]>
---
api/condition_types.go | 4 +-
api/status_types.go | 21 +++-----
api/v1alpha08/sonataflow_types.go | 2 +-
api/v1alpha08/sonataflowbuild_types.go | 4 ++
controllers/builder/containerbuilder.go | 3 ++
controllers/platform/defaults.go | 26 ++++++----
controllers/profiles/common/deployment.go | 30 ++++++++---
controllers/profiles/common/reconciler.go | 24 ++++-----
controllers/profiles/dev/profile_dev.go | 12 +++--
controllers/profiles/dev/profile_dev_test.go | 18 +++----
controllers/profiles/dev/states_dev.go | 12 ++++-
controllers/profiles/factory/factory.go | 7 +--
controllers/profiles/prod/deployment_handler.go | 16 +++---
.../profiles/prod/deployment_handler_test.go | 6 +--
controllers/profiles/prod/profile_prod.go | 21 ++++----
controllers/profiles/prod/profile_prod_test.go | 52 ++++++-------------
controllers/profiles/prod/states_prod.go | 48 +++++++++++++----
controllers/profiles/prod/states_prod_nobuild.go | 18 +++++--
controllers/sonataflow_controller.go | 3 +-
controllers/sonataflow_controller_test.go | 2 +-
controllers/sonataflowbuild_controller.go | 45 ++++++++++++----
controllers/sonataflowbuild_controller_test.go | 27 ++++++++++
controllers/workflows/workflows.go | 60 ++++++++++++++++++++++
test/kubernetes_cli.go | 12 +++++
test/mock_service.go | 58 ---------------------
utils/kubernetes/annotations.go | 23 +++++++++
26 files changed, 346 insertions(+), 208 deletions(-)
diff --git a/api/condition_types.go b/api/condition_types.go
index 5029b0fd..69b6e3be 100644
--- a/api/condition_types.go
+++ b/api/condition_types.go
@@ -51,7 +51,9 @@ const (
BuildFailedReason = "BuildFailedReason"
WaitingForBuildReason = "WaitingForBuild"
BuildIsRunningReason = "BuildIsRunning"
- BuildSkipped = "BuildSkipped"
+ BuildSkippedReason = "BuildSkipped"
+ BuildSuccessfulReason = "BuildSuccessful"
+ BuildMarkedToRestartReason = "BuildMarkedToRestart"
)
// Condition describes the common structure for conditions in our types
diff --git a/api/status_types.go b/api/status_types.go
index 46a2e645..b3d24224 100644
--- a/api/status_types.go
+++ b/api/status_types.go
@@ -283,21 +283,12 @@ func (s *conditionManager) MarkUnknown(t ConditionType,
reason, messageFormat st
// MarkFalse sets the status of t and the ready condition to False.
func (s *conditionManager) MarkFalse(t ConditionType, reason, messageFormat
string, messageA ...interface{}) {
- types := []ConditionType{t}
- for _, cond := range s.dependents {
- if cond == t {
- types = append(types, s.ready)
- }
- }
-
- for _, t := range types {
- s.setCondition(Condition{
- Type: t,
- Status: corev1.ConditionFalse,
- Reason: reason,
- Message: fmt.Sprintf(messageFormat, messageA...),
- })
- }
+ s.setCondition(Condition{
+ Type: t,
+ Status: corev1.ConditionFalse,
+ Reason: reason,
+ Message: fmt.Sprintf(messageFormat, messageA...),
+ })
}
// InitializeConditions updates all Conditions in the ConditionSet to Unknown
diff --git a/api/v1alpha08/sonataflow_types.go
b/api/v1alpha08/sonataflow_types.go
index eca3b605..169aad0a 100644
--- a/api/v1alpha08/sonataflow_types.go
+++ b/api/v1alpha08/sonataflow_types.go
@@ -694,7 +694,7 @@ func (s *SonataFlowStatus) Manager() api.ConditionsManager {
}
func (s *SonataFlowStatus) IsWaitingForPlatform() bool {
- cond := s.GetCondition(api.RunningConditionType)
+ cond := s.GetCondition(api.BuiltConditionType)
return cond.IsFalse() && cond.Reason == api.WaitingForPlatformReason
}
diff --git a/api/v1alpha08/sonataflowbuild_types.go
b/api/v1alpha08/sonataflowbuild_types.go
index 3a3dd178..0817936d 100644
--- a/api/v1alpha08/sonataflowbuild_types.go
+++ b/api/v1alpha08/sonataflowbuild_types.go
@@ -22,6 +22,7 @@ package v1alpha08
import (
"encoding/json"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -50,6 +51,9 @@ const (
BuildPhaseError BuildPhase = "Error"
)
+// BuildRestartAnnotation marks a SonataFlowBuild to restart
+const BuildRestartAnnotation = metadata.Domain + "/restartBuild"
+
// BuildTemplate an abstraction over the actual build process performed by the
platform.
// +k8s:openapi-gen=true
type BuildTemplate struct {
diff --git a/controllers/builder/containerbuilder.go
b/controllers/builder/containerbuilder.go
index 8956d763..15918a86 100644
--- a/controllers/builder/containerbuilder.go
+++ b/controllers/builder/containerbuilder.go
@@ -89,6 +89,9 @@ func (c *containerBuilderManager) Schedule(build
*operatorapi.SonataFlowBuild) e
return err
}
build.Status.BuildPhase =
operatorapi.BuildPhase(containerBuilder.Status.Phase)
+ if len(build.Status.BuildPhase) == 0 {
+ build.Status.BuildPhase = operatorapi.BuildPhaseInitialization
+ }
build.Status.Error = containerBuilder.Status.Error
return nil
}
diff --git a/controllers/platform/defaults.go b/controllers/platform/defaults.go
index f5e5396e..95d8a4c8 100644
--- a/controllers/platform/defaults.go
+++ b/controllers/platform/defaults.go
@@ -22,6 +22,7 @@ package platform
import (
"context"
+ "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
@@ -60,24 +61,27 @@ func ConfigureDefaults(ctx context.Context, c
client.Client, p *operatorapi.Sona
klog.V(log.I).InfoS("Maven Timeout set", "timeout",
p.Spec.Build.Config.Timeout.Duration)
}
- updatePlatform(ctx, c, p)
-
- return nil
+ return createOrUpdatePlatform(ctx, c, p)
}
-func updatePlatform(ctx context.Context, c client.Client, p
*operatorapi.SonataFlowPlatform) {
+func createOrUpdatePlatform(ctx context.Context, c client.Client, p
*operatorapi.SonataFlowPlatform) error {
config := operatorapi.SonataFlowPlatform{}
- errGet := c.Get(ctx, ctrl.ObjectKey{Namespace: p.Namespace, Name:
p.Name}, &config)
- if errGet != nil {
- klog.V(log.E).ErrorS(errGet, "Error reading the Platform")
+ err := c.Get(ctx, ctrl.ObjectKey{Namespace: p.Namespace, Name: p.Name},
&config)
+ if errors.IsNotFound(err) {
+ klog.V(log.D).ErrorS(err, "Platform not found, creating it")
+ return c.Create(ctx, p)
+ } else if err != nil {
+ klog.V(log.E).ErrorS(err, "Error reading the Platform")
+ return err
}
+
config.Spec = p.Spec
config.Status.Cluster = p.Status.Cluster
-
- updateErr := c.Update(ctx, &config)
- if updateErr != nil {
- klog.V(log.E).ErrorS(updateErr, "Error updating the
BuildPlatform")
+ err = c.Update(ctx, &config)
+ if err != nil {
+ klog.V(log.E).ErrorS(err, "Error updating the BuildPlatform")
}
+ return err
}
func newDefaultSonataFlowPlatform(namespace string)
*operatorapi.SonataFlowPlatform {
diff --git a/controllers/profiles/common/deployment.go
b/controllers/profiles/common/deployment.go
index f3ad8e6d..64b67767 100644
--- a/controllers/profiles/common/deployment.go
+++ b/controllers/profiles/common/deployment.go
@@ -24,6 +24,7 @@ import (
"fmt"
appsv1 "k8s.io/api/apps/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
@@ -36,18 +37,20 @@ import (
kubeutil
"github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
)
-var _ WorkflowDeploymentHandler = &deploymentHandler{}
+var _ WorkflowDeploymentManager = &deploymentHandler{}
-// WorkflowDeploymentHandler interface to handle workflow deployment features.
-type WorkflowDeploymentHandler interface {
+// WorkflowDeploymentManager interface to handle workflow deployment features.
+type WorkflowDeploymentManager interface {
// SyncDeploymentStatus updates the workflow status aligned with the
deployment counterpart.
// For example, if the deployment is in a failed state, it sets the
status to
// Running `false` and the Message and Reason to human-readable format.
SyncDeploymentStatus(ctx context.Context, workflow
*operatorapi.SonataFlow) (ctrl.Result, error)
+ // RolloutDeployment rolls out the underlying deployment object for the
given workflow.
+ RolloutDeployment(ctx context.Context, workflow
*operatorapi.SonataFlow) error
}
-// DeploymentHandler creates a new WorkflowDeploymentHandler implementation
based on the current profile.
-func DeploymentHandler(c client.Client) WorkflowDeploymentHandler {
+// DeploymentManager creates a new WorkflowDeploymentManager implementation
based on the current profile.
+func DeploymentManager(c client.Client) WorkflowDeploymentManager {
return &deploymentHandler{c: c}
}
@@ -55,7 +58,22 @@ type deploymentHandler struct {
c client.Client
}
-func (d deploymentHandler) SyncDeploymentStatus(ctx context.Context, workflow
*operatorapi.SonataFlow) (ctrl.Result, error) {
+func (d *deploymentHandler) RolloutDeployment(ctx context.Context, workflow
*operatorapi.SonataFlow) error {
+ deployment := &appsv1.Deployment{}
+ if err := d.c.Get(ctx, client.ObjectKeyFromObject(workflow),
deployment); err != nil {
+ // Deployment not found, nothing to do.
+ if errors.IsNotFound(err) {
+ return nil
+ }
+ return err
+ }
+ if err := kubeutil.MarkDeploymentToRollout(deployment); err != nil {
+ return err
+ }
+ return d.c.Update(ctx, deployment)
+}
+
+func (d *deploymentHandler) SyncDeploymentStatus(ctx context.Context, workflow
*operatorapi.SonataFlow) (ctrl.Result, error) {
deployment := &appsv1.Deployment{}
if err := d.c.Get(ctx, client.ObjectKeyFromObject(workflow),
deployment); err != nil {
// we should have the deployment by this time, so even if the
error above is not found, we should halt.
diff --git a/controllers/profiles/common/reconciler.go
b/controllers/profiles/common/reconciler.go
index 3e6f568d..64ecd07a 100644
--- a/controllers/profiles/common/reconciler.go
+++ b/controllers/profiles/common/reconciler.go
@@ -24,6 +24,7 @@ import (
"fmt"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery"
+ "k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
@@ -36,8 +37,9 @@ import (
// StateSupport is the shared structure with common accessors used throughout
the whole reconciliation profiles
type StateSupport struct {
- C client.Client
- Catalog discovery.ServiceCatalog
+ C client.Client
+ Catalog discovery.ServiceCatalog
+ Recorder record.EventRecorder
}
// PerformStatusUpdate updates the SonataFlow Status conditions
@@ -51,29 +53,23 @@ func (s StateSupport) PerformStatusUpdate(ctx
context.Context, workflow *operato
return true, err
}
-// PostReconcile function to perform all the other operations required after
the reconciliation - placeholder for null pattern usages
-func (s StateSupport) PostReconcile(ctx context.Context, workflow
*operatorapi.SonataFlow) error {
- //By default, we don't want to perform anything after the
reconciliation, and so we will simply return no error
- return nil
-}
-
-// BaseReconciler is the base structure used by every reconciliation profile.
-// Use NewBaseProfileReconciler to build a new reference.
-type BaseReconciler struct {
+// Reconciler is the base structure used by every reconciliation profile.
+// Use NewReconciler to build a new reference.
+type Reconciler struct {
*StateSupport
reconciliationStateMachine *ReconciliationStateMachine
objects []client.Object
}
-func NewBaseProfileReconciler(support *StateSupport, stateMachine
*ReconciliationStateMachine) BaseReconciler {
- return BaseReconciler{
+func NewReconciler(support *StateSupport, stateMachine
*ReconciliationStateMachine) Reconciler {
+ return Reconciler{
StateSupport: support,
reconciliationStateMachine: stateMachine,
}
}
// Reconcile does the actual reconciliation algorithm based on a set of
ReconciliationState
-func (b *BaseReconciler) Reconcile(ctx context.Context, workflow
*operatorapi.SonataFlow) (ctrl.Result, error) {
+func (b *Reconciler) Reconcile(ctx context.Context, workflow
*operatorapi.SonataFlow) (ctrl.Result, error) {
workflow.Status.Manager().InitializeConditions()
result, objects, err := b.reconciliationStateMachine.do(ctx, workflow)
if err != nil {
diff --git a/controllers/profiles/dev/profile_dev.go
b/controllers/profiles/dev/profile_dev.go
index fbc6928a..959d73ae 100644
--- a/controllers/profiles/dev/profile_dev.go
+++ b/controllers/profiles/dev/profile_dev.go
@@ -21,6 +21,7 @@ package dev
import (
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery"
+ "k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -34,17 +35,18 @@ import (
var _ profiles.ProfileReconciler = &developmentProfile{}
type developmentProfile struct {
- common.BaseReconciler
+ common.Reconciler
}
func (d developmentProfile) GetProfile() metadata.ProfileType {
return metadata.DevProfile
}
-func NewProfileReconciler(client client.Client) profiles.ProfileReconciler {
+func NewProfileReconciler(client client.Client, recorder record.EventRecorder)
profiles.ProfileReconciler {
support := &common.StateSupport{
- C: client,
- Catalog: discovery.NewServiceCatalog(client),
+ C: client,
+ Catalog: discovery.NewServiceCatalog(client),
+ Recorder: recorder,
}
var ensurers *objectEnsurers
@@ -63,7 +65,7 @@ func NewProfileReconciler(client client.Client)
profiles.ProfileReconciler {
&recoverFromFailureState{StateSupport: support})
profile := &developmentProfile{
- BaseReconciler: common.NewBaseProfileReconciler(support,
stateMachine),
+ Reconciler: common.NewReconciler(support, stateMachine),
}
klog.V(log.I).InfoS("Reconciling in", "profile", profile.GetProfile())
diff --git a/controllers/profiles/dev/profile_dev_test.go
b/controllers/profiles/dev/profile_dev_test.go
index f8c2b3f7..b09ec12e 100644
--- a/controllers/profiles/dev/profile_dev_test.go
+++ b/controllers/profiles/dev/profile_dev_test.go
@@ -55,7 +55,7 @@ func Test_OverrideStartupProbe(t *testing.T) {
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()
- devReconciler := NewProfileReconciler(client)
+ devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())
result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
@@ -82,7 +82,7 @@ func Test_recoverFromFailureNoDeployment(t *testing.T) {
workflow.Status.Manager().MarkFalse(api.RunningConditionType,
api.DeploymentFailureReason, "")
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()
- reconciler := NewProfileReconciler(client)
+ reconciler := NewProfileReconciler(client, test.NewFakeRecorder())
// we are in failed state and have no objects
result, err := reconciler.Reconcile(context.TODO(), workflow)
@@ -123,7 +123,7 @@ func Test_newDevProfile(t *testing.T) {
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()
- devReconciler := NewProfileReconciler(client)
+ devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())
result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
@@ -196,7 +196,7 @@ func Test_newDevProfile(t *testing.T) {
func Test_devProfileImageDefaultsNoPlatform(t *testing.T) {
workflow := test.GetBaseSonataFlowWithDevProfile(t.Name())
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()
- devReconciler := NewProfileReconciler(client)
+ devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())
result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
@@ -213,7 +213,7 @@ func Test_devProfileWithImageSnapshotOverrideWithPlatform(t
*testing.T) {
platform :=
test.GetBasePlatformWithDevBaseImageInReadyPhase(workflow.Namespace)
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow,
platform).WithStatusSubresource(workflow, platform).Build()
- devReconciler := NewProfileReconciler(client)
+ devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())
result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
@@ -230,7 +230,7 @@ func
Test_devProfileWithWPlatformWithoutDevBaseImageAndWithBaseImage(t *testing.
platform :=
test.GetBasePlatformWithBaseImageInReadyPhase(workflow.Namespace)
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow,
platform).WithStatusSubresource(workflow, platform).Build()
- devReconciler := NewProfileReconciler(client)
+ devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())
result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
@@ -247,7 +247,7 @@ func
Test_devProfileWithPlatformWithoutDevBaseImageAndWithoutBaseImage(t *testin
platform := test.GetBasePlatformInReadyPhase(workflow.Namespace)
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow,
platform).WithStatusSubresource(workflow, platform).Build()
- devReconciler := NewProfileReconciler(client)
+ devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())
result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
@@ -266,7 +266,7 @@ func Test_newDevProfileWithExternalConfigMaps(t *testing.T)
{
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow).WithStatusSubresource(workflow).Build()
- devReconciler := NewProfileReconciler(client)
+ devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())
camelXmlRouteFileName := "camelroute-xml"
xmlRoute := `<route routeConfigurationId="xmlError">
@@ -380,7 +380,7 @@ func Test_VolumeWithCapitalizedPaths(t *testing.T) {
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow,
configMap).WithStatusSubresource(workflow, configMap).Build()
- devReconciler := NewProfileReconciler(client)
+ devReconciler := NewProfileReconciler(client, test.NewFakeRecorder())
result, err := devReconciler.Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
diff --git a/controllers/profiles/dev/states_dev.go
b/controllers/profiles/dev/states_dev.go
index 2726880d..e965547e 100644
--- a/controllers/profiles/dev/states_dev.go
+++ b/controllers/profiles/dev/states_dev.go
@@ -135,6 +135,11 @@ func (e *ensureRunningWorkflowState) Do(ctx
context.Context, workflow *operatora
return ctrl.Result{RequeueAfter: constants.RequeueAfterIsRunning},
objs, nil
}
+func (e *ensureRunningWorkflowState) PostReconcile(ctx context.Context,
workflow *operatorapi.SonataFlow) error {
+ //By default, we don't want to perform anything after the
reconciliation, and so we will simply return no error
+ return nil
+}
+
type followWorkflowDeploymentState struct {
*common.StateSupport
enrichers *statusEnrichers
@@ -145,7 +150,7 @@ func (f *followWorkflowDeploymentState)
CanReconcile(workflow *operatorapi.Sonat
}
func (f *followWorkflowDeploymentState) Do(ctx context.Context, workflow
*operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) {
- result, err := common.DeploymentHandler(f.C).SyncDeploymentStatus(ctx,
workflow)
+ result, err := common.DeploymentManager(f.C).SyncDeploymentStatus(ctx,
workflow)
if err != nil {
return ctrl.Result{RequeueAfter:
constants.RequeueAfterFailure}, nil, err
}
@@ -247,3 +252,8 @@ func (r *recoverFromFailureState) Do(ctx context.Context,
workflow *operatorapi.
}
return ctrl.Result{RequeueAfter:
constants.RequeueRecoverDeploymentErrorInterval}, nil, nil
}
+
+func (r *recoverFromFailureState) PostReconcile(ctx context.Context, workflow
*operatorapi.SonataFlow) error {
+ //By default, we don't want to perform anything after the
reconciliation, and so we will simply return no error
+ return nil
+}
diff --git a/controllers/profiles/factory/factory.go
b/controllers/profiles/factory/factory.go
index 04a91f3e..8cf27959 100644
--- a/controllers/profiles/factory/factory.go
+++ b/controllers/profiles/factory/factory.go
@@ -20,6 +20,7 @@
package factory
import (
+ "k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata"
@@ -35,7 +36,7 @@ const (
opsProfile metadata.ProfileType = "prod_for_ops"
)
-type reconcilerBuilder func(client client.Client) profiles.ProfileReconciler
+type reconcilerBuilder func(client client.Client, recorder
record.EventRecorder) profiles.ProfileReconciler
var profileBuilders = map[metadata.ProfileType]reconcilerBuilder{
metadata.ProdProfile: prod.NewProfileReconciler,
@@ -58,6 +59,6 @@ func profileBuilder(workflow *operatorapi.SonataFlow)
reconcilerBuilder {
}
// NewReconciler creates a new ProfileReconciler based on the given workflow
and context.
-func NewReconciler(client client.Client, workflow *operatorapi.SonataFlow)
profiles.ProfileReconciler {
- return profileBuilder(workflow)(client)
+func NewReconciler(client client.Client, recorder record.EventRecorder,
workflow *operatorapi.SonataFlow) profiles.ProfileReconciler {
+ return profileBuilder(workflow)(client, recorder)
}
diff --git a/controllers/profiles/prod/deployment_handler.go
b/controllers/profiles/prod/deployment_handler.go
index bf31f6a2..8bced1f5 100644
--- a/controllers/profiles/prod/deployment_handler.go
+++ b/controllers/profiles/prod/deployment_handler.go
@@ -31,23 +31,23 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
)
-type deploymentHandler struct {
+type deploymentReconciler struct {
*common.StateSupport
ensurers *objectEnsurers
}
-func newDeploymentHandler(stateSupport *common.StateSupport, ensurer
*objectEnsurers) *deploymentHandler {
- return &deploymentHandler{
+func newDeploymentReconciler(stateSupport *common.StateSupport, ensurer
*objectEnsurers) *deploymentReconciler {
+ return &deploymentReconciler{
StateSupport: stateSupport,
ensurers: ensurer,
}
}
-func (d *deploymentHandler) handle(ctx context.Context, workflow
*operatorapi.SonataFlow) (reconcile.Result, []client.Object, error) {
- return d.handleWithImage(ctx, workflow, "")
+func (d *deploymentReconciler) reconcile(ctx context.Context, workflow
*operatorapi.SonataFlow) (reconcile.Result, []client.Object, error) {
+ return d.reconcileWithBuiltImage(ctx, workflow, "")
}
-func (d *deploymentHandler) handleWithImage(ctx context.Context, workflow
*operatorapi.SonataFlow, image string) (reconcile.Result, []client.Object,
error) {
+func (d *deploymentReconciler) reconcileWithBuiltImage(ctx context.Context,
workflow *operatorapi.SonataFlow, image string) (reconcile.Result,
[]client.Object, error) {
pl, _ := platform.GetActivePlatform(ctx, d.C, workflow.Namespace)
propsCM, _, err := d.ensurers.propertiesConfigMap.Ensure(ctx, workflow,
common.WorkflowPropertiesMutateVisitor(ctx, d.StateSupport.Catalog, workflow,
pl))
if err != nil {
@@ -86,7 +86,7 @@ func (d *deploymentHandler) handleWithImage(ctx
context.Context, workflow *opera
}
// Follow deployment status
- result, err := common.DeploymentHandler(d.C).SyncDeploymentStatus(ctx,
workflow)
+ result, err := common.DeploymentManager(d.C).SyncDeploymentStatus(ctx,
workflow)
if err != nil {
return reconcile.Result{Requeue: false}, nil, err
}
@@ -97,7 +97,7 @@ func (d *deploymentHandler) handleWithImage(ctx
context.Context, workflow *opera
return result, objs, nil
}
-func (d *deploymentHandler) getDeploymentMutateVisitors(
+func (d *deploymentReconciler) getDeploymentMutateVisitors(
workflow *operatorapi.SonataFlow,
image string,
configMap *v1.ConfigMap) []common.MutateVisitor {
diff --git a/controllers/profiles/prod/deployment_handler_test.go
b/controllers/profiles/prod/deployment_handler_test.go
index bde14159..21513a4b 100644
--- a/controllers/profiles/prod/deployment_handler_test.go
+++ b/controllers/profiles/prod/deployment_handler_test.go
@@ -33,9 +33,9 @@ func Test_CheckPodTemplateChangesReflectDeployment(t
*testing.T) {
WithStatusSubresource(workflow).
Build()
stateSupport := fakeReconcilerSupport(client)
- handler := newDeploymentHandler(stateSupport,
newObjectEnsurers(stateSupport))
+ handler := newDeploymentReconciler(stateSupport,
newObjectEnsurers(stateSupport))
- result, objects, err := handler.handle(context.TODO(), workflow)
+ result, objects, err := handler.reconcile(context.TODO(), workflow)
assert.NoError(t, err)
assert.NotEmpty(t, objects)
assert.True(t, result.Requeue)
@@ -44,7 +44,7 @@ func Test_CheckPodTemplateChangesReflectDeployment(t
*testing.T) {
expectedImg := "quay.io/apache/my-new-workflow:1.0.0"
workflow.Spec.PodTemplate.Container.Image = expectedImg
utilruntime.Must(client.Update(context.TODO(), workflow))
- result, objects, err = handler.handle(context.TODO(), workflow)
+ result, objects, err = handler.reconcile(context.TODO(), workflow)
assert.NoError(t, err)
assert.NotEmpty(t, objects)
assert.True(t, result.Requeue)
diff --git a/controllers/profiles/prod/profile_prod.go
b/controllers/profiles/prod/profile_prod.go
index a6048f59..3254950f 100644
--- a/controllers/profiles/prod/profile_prod.go
+++ b/controllers/profiles/prod/profile_prod.go
@@ -23,6 +23,7 @@ import (
"time"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery"
+ "k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -34,7 +35,7 @@ import (
var _ profiles.ProfileReconciler = &prodProfile{}
type prodProfile struct {
- common.BaseReconciler
+ common.Reconciler
}
const (
@@ -64,10 +65,11 @@ func newObjectEnsurers(support *common.StateSupport)
*objectEnsurers {
// NewProfileReconciler the default profile builder which includes a build
state to run an internal build process
// to have an immutable workflow image deployed
-func NewProfileReconciler(client client.Client) profiles.ProfileReconciler {
+func NewProfileReconciler(client client.Client, recorder record.EventRecorder)
profiles.ProfileReconciler {
support := &common.StateSupport{
- C: client,
- Catalog: discovery.NewServiceCatalog(client),
+ C: client,
+ Catalog: discovery.NewServiceCatalog(client),
+ Recorder: recorder,
}
// the reconciliation state machine
stateMachine := common.NewReconciliationStateMachine(
@@ -76,7 +78,7 @@ func NewProfileReconciler(client client.Client)
profiles.ProfileReconciler {
&deployWithBuildWorkflowState{StateSupport: support, ensurers:
newObjectEnsurers(support)},
)
reconciler := &prodProfile{
- BaseReconciler: common.NewBaseProfileReconciler(support,
stateMachine),
+ Reconciler: common.NewReconciler(support, stateMachine),
}
return reconciler
@@ -84,10 +86,11 @@ func NewProfileReconciler(client client.Client)
profiles.ProfileReconciler {
// NewProfileForOpsReconciler creates an alternative prod profile that won't
require to build the workflow image in order to deploy
// the workflow application. It assumes that the image has been built
somewhere else.
-func NewProfileForOpsReconciler(client client.Client)
profiles.ProfileReconciler {
+func NewProfileForOpsReconciler(client client.Client, recorder
record.EventRecorder) profiles.ProfileReconciler {
support := &common.StateSupport{
- C: client,
- Catalog: discovery.NewServiceCatalog(client),
+ C: client,
+ Catalog: discovery.NewServiceCatalog(client),
+ Recorder: recorder,
}
// the reconciliation state machine
stateMachine := common.NewReconciliationStateMachine(
@@ -95,7 +98,7 @@ func NewProfileForOpsReconciler(client client.Client)
profiles.ProfileReconciler
&followDeployWorkflowState{StateSupport: support, ensurers:
newObjectEnsurers(support)},
)
reconciler := &prodProfile{
- BaseReconciler: common.NewBaseProfileReconciler(support,
stateMachine),
+ Reconciler: common.NewReconciler(support, stateMachine),
}
return reconciler
diff --git a/controllers/profiles/prod/profile_prod_test.go
b/controllers/profiles/prod/profile_prod_test.go
index efa7041b..689ac3f4 100644
--- a/controllers/profiles/prod/profile_prod_test.go
+++ b/controllers/profiles/prod/profile_prod_test.go
@@ -24,17 +24,14 @@ import (
"testing"
"time"
- corev1 "k8s.io/api/core/v1"
-
+ "github.com/apache/incubator-kie-kogito-serverless-operator/api"
+ operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common"
-
+ "github.com/apache/incubator-kie-kogito-serverless-operator/test"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
+ corev1 "k8s.io/api/core/v1"
clientruntime "sigs.k8s.io/controller-runtime/pkg/client"
-
- "github.com/apache/incubator-kie-kogito-serverless-operator/api"
- operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
- "github.com/apache/incubator-kie-kogito-serverless-operator/test"
)
func Test_Reconciler_ProdOps(t *testing.T) {
@@ -47,18 +44,18 @@ func Test_Reconciler_ProdOps(t *testing.T) {
client := test.NewSonataFlowClientBuilder().
WithRuntimeObjects(workflow).
WithStatusSubresource(workflow,
&operatorapi.SonataFlowBuild{}).Build()
- result, err :=
NewProfileForOpsReconciler(client).Reconcile(context.TODO(), workflow)
+ result, err := NewProfileForOpsReconciler(client,
test.NewFakeRecorder()).Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
assert.NotNil(t, result.RequeueAfter)
assert.True(t,
workflow.Status.GetCondition(api.BuiltConditionType).IsFalse())
- assert.Equal(t, api.BuildSkipped,
workflow.Status.GetCondition(api.BuiltConditionType).Reason)
+ assert.Equal(t, api.BuildSkippedReason,
workflow.Status.GetCondition(api.BuiltConditionType).Reason)
// We need the deployment controller to tell us that the workflow is
ready
// Since we don't have it in a mocked env, the result must be ready ==
false
assert.False(t, workflow.Status.IsReady())
// Reconcile again to run the ddeployment handler
- result, err =
NewProfileForOpsReconciler(client).Reconcile(context.TODO(), workflow)
+ result, err = NewProfileForOpsReconciler(client,
test.NewFakeRecorder()).Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
// Let's check for the right creation of the workflow (one CM volume,
one container with a custom image)
@@ -86,7 +83,7 @@ func Test_Reconciler_ProdCustomPod(t *testing.T) {
client := test.NewSonataFlowClientBuilder().
WithRuntimeObjects(workflow, build, platform).
WithStatusSubresource(workflow, build, platform).Build()
- _, err := NewProfileReconciler(client).Reconcile(context.TODO(),
workflow)
+ _, err := NewProfileReconciler(client,
test.NewFakeRecorder()).Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
// Let's check for the right creation of the workflow (one CM volume,
one container with a custom image)
@@ -107,7 +104,7 @@ func Test_reconcilerProdBuildConditions(t *testing.T) {
WithRuntimeObjects(workflow, platform).
WithStatusSubresource(workflow, platform,
&operatorapi.SonataFlowBuild{}).Build()
- result, err := NewProfileReconciler(client).Reconcile(context.TODO(),
workflow)
+ result, err := NewProfileReconciler(client,
test.NewFakeRecorder()).Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
assert.NotNil(t, result.RequeueAfter)
@@ -115,7 +112,7 @@ func Test_reconcilerProdBuildConditions(t *testing.T) {
assert.False(t, workflow.Status.IsReady())
// still building
- result, err = NewProfileReconciler(client).Reconcile(context.TODO(),
workflow)
+ result, err = NewProfileReconciler(client,
test.NewFakeRecorder()).Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
assert.Equal(t, requeueWhileWaitForBuild, result.RequeueAfter)
assert.True(t, workflow.Status.IsBuildRunningOrUnknown())
@@ -128,15 +125,15 @@ func Test_reconcilerProdBuildConditions(t *testing.T) {
assert.NoError(t, client.Status().Update(context.TODO(), build))
// last reconciliation cycle waiting for build
- result, err = NewProfileReconciler(client).Reconcile(context.TODO(),
workflow)
+ result, err = NewProfileReconciler(client,
test.NewFakeRecorder()).Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
assert.Equal(t, requeueWhileWaitForBuild, result.RequeueAfter)
assert.False(t, workflow.Status.IsBuildRunningOrUnknown())
assert.False(t, workflow.Status.IsReady())
- assert.Equal(t, api.WaitingForBuildReason,
workflow.Status.GetTopLevelCondition().Reason)
+ assert.Equal(t, api.WaitingForDeploymentReason,
workflow.Status.GetTopLevelCondition().Reason)
// now we create the objects
- result, err = NewProfileReconciler(client).Reconcile(context.TODO(),
workflow)
+ result, err = NewProfileReconciler(client,
test.NewFakeRecorder()).Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
assert.False(t, workflow.Status.IsBuildRunningOrUnknown())
assert.False(t, workflow.Status.IsReady())
@@ -154,7 +151,7 @@ func Test_reconcilerProdBuildConditions(t *testing.T) {
err = client.Status().Update(context.TODO(), deployment)
assert.NoError(t, err)
- result, err = NewProfileReconciler(client).Reconcile(context.TODO(),
workflow)
+ result, err = NewProfileReconciler(client,
test.NewFakeRecorder()).Reconcile(context.TODO(), workflow)
assert.NoError(t, err)
assert.False(t, workflow.Status.IsBuildRunningOrUnknown())
assert.True(t, workflow.Status.IsReady())
@@ -187,24 +184,6 @@ func
Test_deployWorkflowReconciliationHandler_handleObjects(t *testing.T) {
assert.NoError(t, err)
assert.False(t, workflow.Status.IsReady())
assert.Equal(t, api.WaitingForDeploymentReason,
workflow.Status.GetTopLevelCondition().Reason)
-
- // let's mess with the deployment
- /* TODO the state should be able to enforce:
https://issues.redhat.com/browse/KOGITO-8524
- deployment.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort =
9090
- err = client.Update(context.TODO(), deployment)
- assert.NoError(t, err)
- result, objects, err = handler.Do(context.TODO(), workflow)
- assert.True(t, result.Requeue)
- assert.NoError(t, err)
- assert.NotNil(t, result)
- assert.Len(t, objects, 2)
- // the reconciliation state should guarantee our port
- deployment = &appsv1.Deployment{}
- err = client.Get(context.TODO(),
clientruntime.ObjectKeyFromObject(workflow), deployment)
- assert.NoError(t, err)
- assert.Equal(t, int32(8080),
deployment.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort)
- */
-
}
func Test_GenerationAnnotationCheck(t *testing.T) {
@@ -248,6 +227,7 @@ func Test_GenerationAnnotationCheck(t *testing.T) {
func fakeReconcilerSupport(client clientruntime.Client) *common.StateSupport {
return &common.StateSupport{
- C: client,
+ C: client,
+ Recorder: test.NewFakeRecorder(),
}
}
diff --git a/controllers/profiles/prod/states_prod.go
b/controllers/profiles/prod/states_prod.go
index 9770e8ea..feb30dca 100644
--- a/controllers/profiles/prod/states_prod.go
+++ b/controllers/profiles/prod/states_prod.go
@@ -22,6 +22,7 @@ package prod
import (
"context"
+ corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
@@ -56,6 +57,8 @@ func (h *newBuilderState) Do(ctx context.Context, workflow
*operatorapi.SonataFl
_, err = h.PerformStatusUpdate(ctx, workflow)
return ctrl.Result{RequeueAfter:
requeueWhileWaitForPlatform}, nil, err
}
+ // We won't record events here to avoid spamming multiple
events to the object, the status should alert the admin
+ // since a namespace without a platform means incorrect
configuration.
klog.V(log.E).ErrorS(err, "Failed to get active platform")
return ctrl.Result{RequeueAfter: requeueWhileWaitForPlatform},
nil, err
}
@@ -66,7 +69,7 @@ func (h *newBuilderState) Do(ctx context.Context, workflow
*operatorapi.SonataFl
if err != nil {
//If we are not able to retrieve or create a Build CR for this
Workflow we will mark
klog.V(log.E).ErrorS(err, "Failed to retrieve or create a Build
CR")
- workflow.Status.Manager().MarkFalse(api.BuiltConditionType,
api.WaitingForBuildReason,
+ workflow.Status.Manager().MarkFalse(api.BuiltConditionType,
api.BuildFailedReason,
"Failed to retrieve or create a Build CR",
workflow.Namespace)
_, err = h.PerformStatusUpdate(ctx, workflow)
return ctrl.Result{}, nil, err
@@ -76,20 +79,25 @@ func (h *newBuilderState) Do(ctx context.Context, workflow
*operatorapi.SonataFl
workflow.Status.Manager().MarkFalse(api.BuiltConditionType,
api.BuildIsRunningReason, "")
workflow.Status.Manager().MarkFalse(api.RunningConditionType,
api.WaitingForBuildReason, "")
_, err = h.PerformStatusUpdate(ctx, workflow)
+ h.Recorder.Eventf(workflow, corev1.EventTypeNormal,
api.BuildIsRunningReason, "Workflow %s build has started.", workflow.Name)
} else {
- // TODO: not ideal, but we will improve it on
https://issues.redhat.com/browse/KOGITO-8792
- klog.V(log.I).InfoS("Build is in failed state, try to delete
the SonataFlowBuild to restart a new build cycle")
+ klog.V(log.I).InfoS("Build is in failed state, you can mark the
build to rebuild by setting to 'true' the ", "annotation",
operatorapi.BuildRestartAnnotation)
}
return ctrl.Result{RequeueAfter: requeueAfterStartingBuild}, nil, err
}
+func (h *newBuilderState) PostReconcile(ctx context.Context, workflow
*operatorapi.SonataFlow) error {
+ //By default, we don't want to perform anything after the
reconciliation, and so we will simply return no error
+ return nil
+}
+
type followBuildStatusState struct {
*common.StateSupport
}
func (h *followBuildStatusState) CanReconcile(workflow
*operatorapi.SonataFlow) bool {
- return workflow.Status.IsBuildRunningOrUnknown()
+ return workflow.Status.IsBuildRunningOrUnknown() ||
workflow.Status.IsWaitingForBuild()
}
func (h *followBuildStatusState) Do(ctx context.Context, workflow
*operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) {
@@ -101,24 +109,32 @@ func (h *followBuildStatusState) Do(ctx context.Context,
workflow *operatorapi.S
if _, err = h.PerformStatusUpdate(ctx, workflow); err != nil {
return ctrl.Result{}, nil, err
}
- return ctrl.Result{RequeueAfter:
constants.RequeueAfterFailure}, nil, nil
+ return ctrl.Result{RequeueAfter:
constants.RequeueAfterFailure}, nil, err
}
if build.Status.BuildPhase == operatorapi.BuildPhaseSucceeded {
klog.V(log.I).InfoS("Workflow build has finished")
+ if workflow.Status.IsReady() {
+ // Rollout our deployment to take the latest changes in
the new image.
+ if err :=
common.DeploymentManager(h.C).RolloutDeployment(ctx, workflow); err != nil {
+ return ctrl.Result{RequeueAfter:
constants.RequeueAfterFailure}, nil, err
+ }
+ h.Recorder.Eventf(workflow, corev1.EventTypeNormal,
api.WaitingForDeploymentReason, "Rolling out workflow %s deployment.",
workflow.Name)
+ }
+ workflow.Status.Manager().MarkFalse(api.RunningConditionType,
api.WaitingForDeploymentReason, "Build has finished, rolling out deployment")
//If we have finished a build and the workflow is not running,
we will start the provisioning phase
workflow.Status.Manager().MarkTrue(api.BuiltConditionType)
_, err = h.PerformStatusUpdate(ctx, workflow)
+ h.Recorder.Eventf(workflow, corev1.EventTypeNormal,
api.BuildSuccessfulReason, "Workflow %s build has been finished successfully.",
workflow.Name)
} else if build.Status.BuildPhase == operatorapi.BuildPhaseFailed ||
build.Status.BuildPhase == operatorapi.BuildPhaseError {
- // TODO: we should handle build failures
https://issues.redhat.com/browse/KOGITO-8792
- // TODO: ideally, we can have a configuration per platform of
how many attempts we try to rebuild
- // TODO: to rebuild, just do buildManager.MarkToRestart. The
controller will then try to rebuild the workflow.
workflow.Status.Manager().MarkFalse(api.BuiltConditionType,
api.BuildFailedReason,
"Workflow %s build failed. Error: %s", workflow.Name,
build.Status.Error)
_, err = h.PerformStatusUpdate(ctx, workflow)
+ h.Recorder.Eventf(workflow, corev1.EventTypeWarning,
api.BuildFailedReason, "Workflow %s build has failed. Error: %s",
workflow.Name, build.Status.Error)
} else if build.Status.BuildPhase == operatorapi.BuildPhaseRunning &&
!workflow.Status.IsBuildRunning() {
workflow.Status.Manager().MarkFalse(api.BuiltConditionType,
api.BuildIsRunningReason, "")
_, err = h.PerformStatusUpdate(ctx, workflow)
+ h.Recorder.Eventf(workflow, corev1.EventTypeNormal,
api.BuildIsRunningReason, "Workflow %s build is running.", workflow.Name)
}
if err != nil {
@@ -128,6 +144,11 @@ func (h *followBuildStatusState) Do(ctx context.Context,
workflow *operatorapi.S
return ctrl.Result{RequeueAfter: requeueWhileWaitForBuild}, nil, nil
}
+func (h *followBuildStatusState) PostReconcile(ctx context.Context, workflow
*operatorapi.SonataFlow) error {
+ //By default, we don't want to perform anything after the
reconciliation, and so we will simply return no error
+ return nil
+}
+
type deployWithBuildWorkflowState struct {
*common.StateSupport
ensurers *objectEnsurers
@@ -157,18 +178,23 @@ func (h *deployWithBuildWorkflowState) Do(ctx
context.Context, workflow *operato
}
if h.isWorkflowChanged(workflow) { // Let's check that the 2
resWorkflowDef definition are different
- workflow.Status.Manager().MarkUnknown(api.RunningConditionType,
"", "")
if err = buildManager.MarkToRestart(build); err != nil {
return ctrl.Result{}, nil, err
}
- workflow.Status.Manager().MarkFalse(api.BuiltConditionType,
api.BuildIsRunningReason, "Marked to restart")
+ workflow.Status.Manager().MarkFalse(api.BuiltConditionType,
api.BuildIsRunningReason, "Build marked to restart")
workflow.Status.Manager().MarkUnknown(api.RunningConditionType,
"", "")
_, err = h.PerformStatusUpdate(ctx, workflow)
+ h.Recorder.Eventf(workflow, corev1.EventTypeNormal,
api.BuildMarkedToRestartReason, "Workflow %s will start a new build.",
workflow.Name)
return ctrl.Result{Requeue: false}, nil, err
}
// didn't change, business as usual
- return newDeploymentHandler(h.StateSupport,
h.ensurers).handleWithImage(ctx, workflow, build.Status.ImageTag)
+ return newDeploymentReconciler(h.StateSupport,
h.ensurers).reconcileWithBuiltImage(ctx, workflow, build.Status.ImageTag)
+}
+
+func (h *deployWithBuildWorkflowState) PostReconcile(ctx context.Context,
workflow *operatorapi.SonataFlow) error {
+ //By default, we don't want to perform anything after the
reconciliation, and so we will simply return no error
+ return nil
}
// isWorkflowChanged marks the workflow status as unknown to require a new
build reconciliation
diff --git a/controllers/profiles/prod/states_prod_nobuild.go
b/controllers/profiles/prod/states_prod_nobuild.go
index b80b58c9..46449ce1 100644
--- a/controllers/profiles/prod/states_prod_nobuild.go
+++ b/controllers/profiles/prod/states_prod_nobuild.go
@@ -33,12 +33,12 @@ type ensureBuildSkipped struct {
func (f *ensureBuildSkipped) CanReconcile(workflow *operatorapi.SonataFlow)
bool {
return workflow.Status.GetCondition(api.BuiltConditionType).IsUnknown()
||
workflow.Status.GetCondition(api.BuiltConditionType).IsTrue() ||
- workflow.Status.GetCondition(api.BuiltConditionType).Reason !=
api.BuildSkipped
+ workflow.Status.GetCondition(api.BuiltConditionType).Reason !=
api.BuildSkippedReason
}
func (f *ensureBuildSkipped) Do(ctx context.Context, workflow
*operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) {
// We skip the build, so let's ensure the status reflect that
- workflow.Status.Manager().MarkFalse(api.BuiltConditionType,
api.BuildSkipped, "")
+ workflow.Status.Manager().MarkFalse(api.BuiltConditionType,
api.BuildSkippedReason, "")
if _, err := f.PerformStatusUpdate(ctx, workflow); err != nil {
return ctrl.Result{Requeue: false}, nil, err
}
@@ -46,6 +46,11 @@ func (f *ensureBuildSkipped) Do(ctx context.Context,
workflow *operatorapi.Sonat
return ctrl.Result{Requeue: true}, nil, nil
}
+func (f *ensureBuildSkipped) PostReconcile(ctx context.Context, workflow
*operatorapi.SonataFlow) error {
+ //By default, we don't want to perform anything after the
reconciliation, and so we will simply return no error
+ return nil
+}
+
type followDeployWorkflowState struct {
*common.StateSupport
ensurers *objectEnsurers
@@ -53,9 +58,14 @@ type followDeployWorkflowState struct {
func (f *followDeployWorkflowState) CanReconcile(workflow
*operatorapi.SonataFlow) bool {
// we always reconcile since in this flow we don't mind building
anything, just reconcile the deployment state
- return workflow.Status.GetCondition(api.BuiltConditionType).Reason ==
api.BuildSkipped
+ return workflow.Status.GetCondition(api.BuiltConditionType).Reason ==
api.BuildSkippedReason
}
func (f *followDeployWorkflowState) Do(ctx context.Context, workflow
*operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) {
- return newDeploymentHandler(f.StateSupport, f.ensurers).handle(ctx,
workflow)
+ return newDeploymentReconciler(f.StateSupport,
f.ensurers).reconcile(ctx, workflow)
+}
+
+func (f *followDeployWorkflowState) PostReconcile(ctx context.Context,
workflow *operatorapi.SonataFlow) error {
+ //By default, we don't want to perform anything after the
reconciliation, and so we will simply return no error
+ return nil
}
diff --git a/controllers/sonataflow_controller.go
b/controllers/sonataflow_controller.go
index c8afde21..4e6bf745 100644
--- a/controllers/sonataflow_controller.go
+++ b/controllers/sonataflow_controller.go
@@ -95,7 +95,7 @@ func (r *SonataFlowReconciler) Reconcile(ctx context.Context,
req ctrl.Request)
return reconcile.Result{}, nil
}
- return profiles.NewReconciler(r.Client, workflow).Reconcile(ctx,
workflow)
+ return profiles.NewReconciler(r.Client, r.Recorder,
workflow).Reconcile(ctx, workflow)
}
func platformEnqueueRequestsFromMapFunc(c client.Client, p
*operatorapi.SonataFlowPlatform) []reconcile.Request {
@@ -135,7 +135,6 @@ func buildEnqueueRequestsFromMapFunc(c client.Client, b
*operatorapi.SonataFlowB
var requests []reconcile.Request
if b.Status.BuildPhase == operatorapi.BuildPhaseSucceeded {
-
// Fetch the Workflow instance
workflow := &operatorapi.SonataFlow{}
namespacedName := types.NamespacedName{
diff --git a/controllers/sonataflow_controller_test.go
b/controllers/sonataflow_controller_test.go
index ebb5c868..fd9d04e7 100644
--- a/controllers/sonataflow_controller_test.go
+++ b/controllers/sonataflow_controller_test.go
@@ -49,7 +49,7 @@ func TestSonataFlowController(t *testing.T) {
// Create a fake client to mock API calls.
cl :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(objs...).WithStatusSubresource(ksw,
ksp).Build()
// Create a SonataFlowReconciler object with the scheme and
fake client.
- r := &SonataFlowReconciler{Client: cl, Scheme: cl.Scheme()}
+ r := &SonataFlowReconciler{Client: cl, Scheme: cl.Scheme(),
Recorder: test.NewFakeRecorder()}
// Mock request to simulate Reconcile() being called on an
event for a
// watched resource .
diff --git a/controllers/sonataflowbuild_controller.go
b/controllers/sonataflowbuild_controller.go
index 81ff9c09..e9b98a95 100644
--- a/controllers/sonataflowbuild_controller.go
+++ b/controllers/sonataflowbuild_controller.go
@@ -25,6 +25,8 @@ import (
"reflect"
"time"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflows"
+ kubeutil
"github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
"k8s.io/klog/v2"
buildv1 "github.com/openshift/api/build/v1"
@@ -87,19 +89,15 @@ func (r *SonataFlowBuildReconciler) Reconcile(ctx
context.Context, req ctrl.Requ
return ctrl.Result{}, err
}
- if phase == operatorapi.BuildPhaseNone {
- if err = buildManager.Schedule(build); err != nil {
- return ctrl.Result{}, err
- }
- return ctrl.Result{RequeueAfter: requeueAfterForNewBuild},
r.manageStatusUpdate(ctx, build)
- // TODO: this smells, why not just else? review in the future:
https://issues.redhat.com/browse/KOGITO-8785
+ if phase == operatorapi.BuildPhaseNone ||
kubeutil.GetAnnotationAsBool(build, operatorapi.BuildRestartAnnotation) {
+ return r.scheduleNewBuild(ctx, buildManager, build)
} else if phase != operatorapi.BuildPhaseSucceeded && phase !=
operatorapi.BuildPhaseError && phase != operatorapi.BuildPhaseFailed {
beforeReconcileStatus := build.Status.DeepCopy()
if err = buildManager.Reconcile(build); err != nil {
return ctrl.Result{}, err
}
if !reflect.DeepEqual(build.Status, beforeReconcileStatus) {
- if err = r.manageStatusUpdate(ctx, build); err != nil {
+ if err = r.manageStatusUpdate(ctx, build,
beforeReconcileStatus.BuildPhase); err != nil {
return ctrl.Result{}, err
}
}
@@ -109,10 +107,37 @@ func (r *SonataFlowBuildReconciler) Reconcile(ctx
context.Context, req ctrl.Requ
return ctrl.Result{}, nil
}
-func (r *SonataFlowBuildReconciler) manageStatusUpdate(ctx context.Context,
instance *operatorapi.SonataFlowBuild) error {
+func (r *SonataFlowBuildReconciler) scheduleNewBuild(ctx context.Context,
buildManager builder.BuildManager, build *operatorapi.SonataFlowBuild)
(ctrl.Result, error) {
+ if err := buildManager.Schedule(build); err != nil {
+ return ctrl.Result{}, err
+ }
+ if err := r.manageStatusUpdate(ctx, build, ""); err != nil {
+ return ctrl.Result{}, err
+ }
+ if kubeutil.GetAnnotationAsBool(build,
operatorapi.BuildRestartAnnotation) {
+ // Remove restart annotation to not enter in infinity
reconciliation loop
+ kubeutil.SetAnnotation(build,
operatorapi.BuildRestartAnnotation, "false")
+ if err := r.Update(ctx, build); err != nil {
+ return ctrl.Result{}, err
+ }
+ // Signals to the workflow that we are rebuilding
+ workflowManager, err := workflows.NewManager(r.Client, ctx,
build.Namespace, build.Name)
+ if err != nil {
+ return ctrl.Result{}, err
+ }
+ if err := workflowManager.SetBuiltStatusToRunning("Build marked
to restart"); err != nil {
+ return ctrl.Result{}, err
+ }
+ }
+
+ return ctrl.Result{RequeueAfter: requeueAfterForNewBuild}, nil
+}
+
+func (r *SonataFlowBuildReconciler) manageStatusUpdate(ctx context.Context,
instance *operatorapi.SonataFlowBuild, beforeReconcilePhase
operatorapi.BuildPhase) error {
err := r.Status().Update(ctx, instance)
- if err == nil {
- r.Recorder.Event(instance, corev1.EventTypeNormal, "Updated",
fmt.Sprintf("Updated buildphase to %s", instance.Status.BuildPhase))
+ // Don't need to spam events if the phase hasn't changed
+ if err == nil && beforeReconcilePhase != instance.Status.BuildPhase {
+ r.Recorder.Event(instance, corev1.EventTypeNormal, "Updated",
fmt.Sprintf("Updated buildphase to %s", instance.Status.BuildPhase))
}
return err
}
diff --git a/controllers/sonataflowbuild_controller_test.go
b/controllers/sonataflowbuild_controller_test.go
index 53b10958..6ec69a6e 100644
--- a/controllers/sonataflowbuild_controller_test.go
+++ b/controllers/sonataflowbuild_controller_test.go
@@ -111,3 +111,30 @@ func TestSonataFlowBuildController_WithArgsAndEnv(t
*testing.T) {
assert.Len(t, containerBuild.Spec.Tasks[0].Kaniko.AdditionalFlags, 1)
assert.Len(t, containerBuild.Spec.Tasks[0].Kaniko.Envs, 1)
}
+
+func TestSonataFlowBuildController_MarkToRestart(t *testing.T) {
+ namespace := t.Name()
+ ksw := test.GetBaseSonataFlow(namespace)
+ ksb := test.GetNewEmptySonataFlowBuild(ksw.Name, namespace)
+ ksb.Annotations = map[string]string{operatorapi.BuildRestartAnnotation:
"true"}
+
+ cl := test.NewSonataFlowClientBuilder().
+ WithRuntimeObjects(ksb, ksw).
+ WithRuntimeObjects(test.GetBasePlatformInReadyPhase(namespace)).
+ WithRuntimeObjects(test.GetSonataFlowBuilderConfig(namespace)).
+ WithStatusSubresource(ksb, ksw).
+ Build()
+
+ r := &SonataFlowBuildReconciler{cl, cl.Scheme(),
&record.FakeRecorder{}, &rest.Config{}}
+ req := reconcile.Request{
+ NamespacedName: types.NamespacedName{
+ Name: ksb.Name,
+ Namespace: ksb.Namespace,
+ },
+ }
+
+ _, err := r.Reconcile(context.TODO(), req)
+ assert.NoError(t, err)
+ ksb = test.MustGetBuild(t, cl, types.NamespacedName{Name: ksb.Name,
Namespace: namespace})
+ assert.Equal(t, "false",
ksb.Annotations[operatorapi.BuildRestartAnnotation])
+}
diff --git a/controllers/workflows/workflows.go
b/controllers/workflows/workflows.go
new file mode 100644
index 00000000..d3b4c27b
--- /dev/null
+++ b/controllers/workflows/workflows.go
@@ -0,0 +1,60 @@
+// Copyright 2023 Apache Software Foundation (ASF)
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package workflows
+
+import (
+ "context"
+
+ "github.com/apache/incubator-kie-kogito-serverless-operator/api"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+ "k8s.io/apimachinery/pkg/types"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+var _ WorkflowManager = &workflowManager{}
+
+// WorkflowManager offers a management interface for operations with
SonataFlows instances outside the controller's package.
+// Meant to be used by other packages that don't have access to a SonataFlow
instance coming from a reconciliation cycle.
+type WorkflowManager interface {
+ SetBuiltStatusToRunning(message string) error
+ GetWorkflow() *v1alpha08.SonataFlow
+}
+
+type workflowManager struct {
+ workflow *v1alpha08.SonataFlow
+ client client.Client
+ ctx context.Context
+}
+
+func (w *workflowManager) GetWorkflow() *v1alpha08.SonataFlow {
+ return w.workflow
+}
+
+func (w *workflowManager) SetBuiltStatusToRunning(message string) error {
+ w.workflow.Status.Manager().MarkFalse(api.BuiltConditionType,
api.BuildIsRunningReason, message)
+ return w.client.Status().Update(w.ctx, w.workflow)
+}
+
+func NewManager(client client.Client, ctx context.Context, ns, name string)
(WorkflowManager, error) {
+ workflow := &v1alpha08.SonataFlow{}
+ if err := client.Get(ctx, types.NamespacedName{Name: name, Namespace:
ns}, workflow); err != nil {
+ return nil, err
+ }
+ return &workflowManager{
+ workflow: workflow,
+ client: client,
+ ctx: ctx,
+ }, nil
+}
diff --git a/test/kubernetes_cli.go b/test/kubernetes_cli.go
index 716542c4..c4d7021f 100644
--- a/test/kubernetes_cli.go
+++ b/test/kubernetes_cli.go
@@ -32,12 +32,17 @@ import (
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes/scheme"
+ "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
)
+func NewFakeRecorder() record.EventRecorder {
+ return record.NewFakeRecorder(10)
+}
+
// NewSonataFlowClientBuilder creates a new fake.ClientBuilder with the right
scheme references
func NewSonataFlowClientBuilder() *fake.ClientBuilder {
s := scheme.Scheme
@@ -78,6 +83,13 @@ func MustGetWorkflow(t *testing.T, client ctrl.WithWatch,
name types.NamespacedN
return mustGet(t, client, workflow, workflow).(*operatorapi.SonataFlow)
}
+func MustGetBuild(t *testing.T, client ctrl.WithWatch, name
types.NamespacedName) *operatorapi.SonataFlowBuild {
+ build := &operatorapi.SonataFlowBuild{}
+ err := client.Get(context.TODO(), name, build)
+ assert.NoError(t, err)
+ return build
+}
+
func mustGet(t *testing.T, client ctrl.WithWatch, workflow
*operatorapi.SonataFlow, obj ctrl.Object) ctrl.Object {
err := client.Get(context.TODO(), ctrl.ObjectKeyFromObject(workflow),
obj)
assert.NoError(t, err)
diff --git a/test/mock_service.go b/test/mock_service.go
index 6109cce3..2003c5d6 100644
--- a/test/mock_service.go
+++ b/test/mock_service.go
@@ -22,8 +22,6 @@ package test
import (
"context"
- "k8s.io/klog/v2"
-
oappsv1 "github.com/openshift/api/apps/v1"
buildv1 "github.com/openshift/api/build/v1"
consolev1 "github.com/openshift/api/console/v1"
@@ -35,10 +33,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
clientv1 "sigs.k8s.io/controller-runtime/pkg/client"
- "sigs.k8s.io/controller-runtime/pkg/client/fake"
-
- apiv08
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
- "github.com/apache/incubator-kie-kogito-serverless-operator/log"
)
type MockPlatformService struct {
@@ -56,10 +50,6 @@ type MockPlatformService struct {
StatusFunc func() clientv1.StatusWriter
}
-func MockService() *MockPlatformService {
- return MockServiceWithExtraScheme()
-}
-
var knownTypes = map[schema.GroupVersion][]runtime.Object{
corev1.SchemeGroupVersion: {
&corev1.PersistentVolumeClaim{},
@@ -106,54 +96,6 @@ var knownTypes = map[schema.GroupVersion][]runtime.Object{
},
}
-func MockServiceWithExtraScheme(objs ...runtime.Object) *MockPlatformService {
- registerObjs := []runtime.Object{&apiv08.SonataFlow{},
&apiv08.SonataFlowList{}}
- registerObjs = append(registerObjs, objs...)
- apiv08.SchemeBuilder.Register(registerObjs...)
- scheme, _ := apiv08.SchemeBuilder.Build()
- for gv, types := range knownTypes {
- for _, t := range types {
- scheme.AddKnownTypes(gv, t)
- }
- }
- client := fake.NewFakeClientWithScheme(scheme)
- klog.V(log.D).InfoS("Fake client created", "client", client)
- return &MockPlatformService{
- Client: client,
- scheme: scheme,
- CreateFunc: func(ctx context.Context, obj clientv1.Object, opts
...clientv1.CreateOption) error {
- return client.Create(ctx, obj, opts...)
- },
- DeleteFunc: func(ctx context.Context, obj clientv1.Object, opts
...clientv1.DeleteOption) error {
- return client.Delete(ctx, obj, opts...)
- },
- GetFunc: func(ctx context.Context, key clientv1.ObjectKey, obj
clientv1.Object) error {
- return client.Get(ctx, key, obj)
- },
- ListFunc: func(ctx context.Context, list clientv1.ObjectList,
opts ...clientv1.ListOption) error {
- return client.List(ctx, list, opts...)
- },
- UpdateFunc: func(ctx context.Context, obj clientv1.Object, opts
...clientv1.UpdateOption) error {
- return client.Update(ctx, obj, opts...)
- },
- PatchFunc: func(ctx context.Context, obj clientv1.Object, patch
clientv1.Patch, opts ...clientv1.PatchOption) error {
- return client.Patch(ctx, obj, patch, opts...)
- },
- DeleteAllOfFunc: func(ctx context.Context, obj clientv1.Object,
opts ...clientv1.DeleteAllOfOption) error {
- return client.DeleteAllOf(ctx, obj, opts...)
- },
- GetCachedFunc: func(ctx context.Context, key
clientv1.ObjectKey, obj clientv1.Object) error {
- return client.Get(ctx, key, obj)
- },
- GetSchemeFunc: func() *runtime.Scheme {
- return scheme
- },
- StatusFunc: func() clientv1.StatusWriter {
- return client.Status()
- },
- }
-}
-
func (service *MockPlatformService) Create(ctx context.Context, obj
clientv1.Object, opts ...clientv1.CreateOption) error {
return service.CreateFunc(ctx, obj, opts...)
}
diff --git a/utils/kubernetes/annotations.go b/utils/kubernetes/annotations.go
index e3bac5eb..24d9ba4a 100644
--- a/utils/kubernetes/annotations.go
+++ b/utils/kubernetes/annotations.go
@@ -21,6 +21,7 @@ package kubernetes
import (
"context"
+ "strconv"
"k8s.io/klog/v2"
@@ -45,3 +46,25 @@ func GetLastGeneration(namespace string, name string, c
client.Client, ctx conte
workflow := getWorkflow(namespace, name, c, ctx)
return workflow.Generation
}
+
+// GetAnnotationAsBool returns the boolean value from the given annotation.
+// If the annotation is not present or is there an error in the ParseBool
conversion, returns false.
+func GetAnnotationAsBool(object client.Object, key string) bool {
+ if object.GetAnnotations() != nil {
+ b, err := strconv.ParseBool(object.GetAnnotations()[key])
+ if err != nil {
+ return false
+ }
+ return b
+ }
+ return false
+}
+
+// SetAnnotation Safely set the annotation to the object
+func SetAnnotation(object client.Object, key, value string) {
+ if object.GetAnnotations() != nil {
+ object.GetAnnotations()[key] = value
+ } else {
+ object.SetAnnotations(map[string]string{key: value})
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]