This is an automated email from the ASF dual-hosted git repository.
wmedvedeo pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-serverless-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 47c73db6 kie-kogito-serverless-operator-361: Add data-index and job
service startupProbes to the workflow Deployment (#377)
47c73db6 is described below
commit 47c73db60dda8712b74de54934032c774125815d
Author: Walter Medvedeo <[email protected]>
AuthorDate: Fri Feb 2 09:10:51 2024 +0100
kie-kogito-serverless-operator-361: Add data-index and job service
startupProbes to the workflow Deployment (#377)
---
controllers/platform/services/properties.go | 6 +
controllers/platform/services/services.go | 17 +-
.../profiles/common/constants/platform_services.go | 13 +-
.../profiles/common/properties/application_test.go | 15 +-
controllers/sonataflowplatform_controller_test.go | 4 +-
controllers/workflowdef/utils.go | 113 ++++++++
controllers/workflowdef/utils_suite_test.go | 32 +++
controllers/workflowdef/utils_test.go | 285 +++++++++++++++++++++
8 files changed, 465 insertions(+), 20 deletions(-)
diff --git a/controllers/platform/services/properties.go
b/controllers/platform/services/properties.go
index dbe6dc3f..c541f5b0 100644
--- a/controllers/platform/services/properties.go
+++ b/controllers/platform/services/properties.go
@@ -24,6 +24,8 @@ import (
"net/url"
"strings"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflowdef"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
"k8s.io/klog/v2"
@@ -164,6 +166,7 @@ func GenerateDataIndexWorkflowProperties(workflow
*operatorapi.SonataFlow, platf
if workflow != nil && !profiles.IsDevProfile(workflow) &&
di.IsServiceEnabled() {
props.Set(constants.KogitoProcessDefinitionsEventsEnabled,
"true")
props.Set(constants.KogitoProcessInstancesEventsEnabled, "true")
+ props.Set(constants.KogitoDataIndexHealthCheckEnabled, "true")
di := NewDataIndexHandler(platform)
p, err := di.GenerateWorkflowProperties()
if err != nil {
@@ -186,6 +189,9 @@ func GenerateJobServiceWorkflowProperties(workflow
*operatorapi.SonataFlow, plat
props.Set(constants.JobServiceRequestEventsURL,
fmt.Sprintf("%s://localhost/v2/jobs/events", constants.JobServiceURLProtocol))
js := NewJobServiceHandler(platform)
if workflow != nil && !profiles.IsDevProfile(workflow) &&
js.IsServiceEnabled() {
+ if workflowdef.HasTimeouts(workflow) {
+ props.Set(constants.KogitoJobServiceHealthCheckEnabled,
"true")
+ }
p, err := js.GenerateWorkflowProperties()
if err != nil {
return nil, err
diff --git a/controllers/platform/services/services.go
b/controllers/platform/services/services.go
index e49679e5..05aedbb6 100644
--- a/controllers/platform/services/services.go
+++ b/controllers/platform/services/services.go
@@ -81,8 +81,6 @@ type PlatformServiceHandler interface {
GetLocalServiceBaseUrl() string
// GetServiceBaseUrl returns the base url of the service, based on
whether using local or cluster-scoped service.
GetServiceBaseUrl() string
- // GetServiceUrl returns the service url, based on whether using local
or cluster-scoped service.
- GetServiceUrl() string
// IsServiceEnabled returns true if the service is enabled in either
the spec or the status.clusterPlatformRef.
IsServiceEnabled() bool
// SetServiceUrlInStatus sets the service url in status. if reconciled
instance does not have service set in spec AND
@@ -150,10 +148,6 @@ func (d DataIndexHandler) IsServiceEnabled() bool {
return d.IsServiceEnabledInSpec() || d.isServiceEnabledInStatus()
}
-func (d DataIndexHandler) GetServiceUrl() string {
- return d.GetServiceBaseUrl() +
constants.KogitoProcessInstancesEventsPath
-}
-
func (d DataIndexHandler) GetServiceBaseUrl() string {
if d.IsServiceEnabledInSpec() {
return d.GetLocalServiceBaseUrl()
@@ -236,8 +230,9 @@ func (d DataIndexHandler) GetServiceCmName() string {
func (d DataIndexHandler) GenerateWorkflowProperties()
(*properties.Properties, error) {
props := properties.NewProperties()
if d.IsServiceEnabled() {
+ props.Set(constants.KogitoDataIndexURL, d.GetServiceBaseUrl())
props.Set(constants.KogitoProcessDefinitionsEventsURL,
d.GetServiceBaseUrl()+constants.KogitoProcessDefinitionsEventsPath)
- props.Set(constants.KogitoProcessInstancesEventsURL,
d.GetServiceUrl())
+ props.Set(constants.KogitoProcessInstancesEventsURL,
d.GetServiceBaseUrl()+constants.KogitoProcessInstancesEventsPath)
}
return props, nil
}
@@ -313,10 +308,6 @@ func (j JobServiceHandler) IsServiceEnabled() bool {
return j.IsServiceEnabledInSpec() || j.isServiceEnabledInStatus()
}
-func (j JobServiceHandler) GetServiceUrl() string {
- return j.GetServiceBaseUrl() + constants.JobServiceURLPath
-}
-
func (j JobServiceHandler) GetServiceBaseUrl() string {
if j.IsServiceEnabledInSpec() {
return j.GetLocalServiceBaseUrl()
@@ -411,8 +402,8 @@ func (j JobServiceHandler) GenerateServiceProperties()
(*properties.Properties,
func (j JobServiceHandler) GenerateWorkflowProperties()
(*properties.Properties, error) {
props := properties.NewProperties()
if j.IsServiceEnabled() {
- // add data source reactive URL
- props.Set(constants.JobServiceRequestEventsURL,
j.GetServiceUrl())
+ props.Set(constants.KogitoJobServiceURL, j.GetServiceBaseUrl())
+ props.Set(constants.JobServiceRequestEventsURL,
j.GetServiceBaseUrl()+constants.JobServiceJobEventsPath)
}
return props, nil
}
diff --git a/controllers/profiles/common/constants/platform_services.go
b/controllers/profiles/common/constants/platform_services.go
index 3db52e16..e0f24927 100644
--- a/controllers/profiles/common/constants/platform_services.go
+++ b/controllers/profiles/common/constants/platform_services.go
@@ -32,7 +32,7 @@ const (
JobServiceStatusChangeEventsURL =
"mp.messaging.outgoing.kogito-job-service-job-status-events-http.url"
JobServiceURLProtocol = "http"
JobServiceDataSourceReactiveURL = "quarkus.datasource.reactive.url"
- JobServiceURLPath = "/v2/jobs/events"
+ JobServiceJobEventsPath = "/v2/jobs/events"
KogitoProcessEventsProtocol = "http"
KogitoProcessInstancesEventsURL =
"mp.messaging.outgoing.kogito-processinstances-events.url"
@@ -42,7 +42,16 @@ const (
KogitoProcessDefinitionsEventsEnabled =
"kogito.events.processdefinitions.enabled"
KogitoProcessDefinitionsEventsPath = "/definitions"
KogitoUserTasksEventsEnabled =
"kogito.events.usertasks.enabled"
- KogitoEventsVariablesEnabled =
"kogito.events.variables.enabled"
+ // KogitoDataIndexHealthCheckEnabled configures if a workflow must
check for the data index availability as part
+ // of its start health check.
+ KogitoDataIndexHealthCheckEnabled = "kogito.data-index.health-enabled"
+ // KogitoDataIndexURL configures the data index url, this value can be
used internally by the workflow.
+ KogitoDataIndexURL = "kogito.data-index.url"
+ // KogitoJobServiceHealthCheckEnabled configures if a workflow must
check for the job service availability as part
+ // of its start health check.
+ KogitoJobServiceHealthCheckEnabled =
"kogito.jobs-service.health-enabled"
+ // KogitoJobServiceURL configures the jobs service, this value can be
used internally by the workflow.
+ KogitoJobServiceURL = "kogito.jobs-service.url"
KogitoServiceURLProperty = "kogito.service.url"
KogitoServiceURLProtocol = "http"
DataIndexKafkaSmallRyeHealthProperty =
`quarkus.smallrye-health.check."io.quarkus.kafka.client.health.KafkaHealthCheck".enabled`
diff --git a/controllers/profiles/common/properties/application_test.go
b/controllers/profiles/common/properties/application_test.go
index 2a95e967..543247dd 100644
--- a/controllers/profiles/common/properties/application_test.go
+++ b/controllers/profiles/common/properties/application_test.go
@@ -239,7 +239,7 @@ func
Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) {
assert.NoError(t, err)
generatedProps, propsErr =
properties.LoadString(props.WithUserProperties(userProperties).Build())
assert.NoError(t, propsErr)
- assert.Equal(t, 15, len(generatedProps.Keys()))
+ assert.Equal(t, 18, len(generatedProps.Keys()))
assert.Equal(t,
"http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace+"/definitions",
generatedProps.GetString(constants.KogitoProcessDefinitionsEventsURL, ""))
assert.Equal(t, "true",
generatedProps.GetString(constants.KogitoProcessDefinitionsEventsEnabled, ""))
assert.Equal(t,
"http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace+"/processes",
generatedProps.GetString(constants.KogitoProcessInstancesEventsURL, ""))
@@ -249,6 +249,9 @@ func
Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) {
assert.Equal(t, "",
generatedProps.GetString(constants.JobServiceDataSourceReactiveURL, ""))
assert.Equal(t, "",
generatedProps.GetString(constants.JobServiceStatusChangeEvents, ""))
assert.Equal(t, "",
generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, ""))
+ assert.Equal(t, "true",
generatedProps.GetString(constants.KogitoDataIndexHealthCheckEnabled, ""))
+ assert.Equal(t,
"http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace,
generatedProps.GetString(constants.KogitoDataIndexURL, ""))
+ assert.Equal(t,
"http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace,
generatedProps.GetString(constants.KogitoJobServiceURL, ""))
// disabling data index bypasses config of outgoing events url
platform.Spec.Services.DataIndex.Enabled = nil
@@ -256,7 +259,7 @@ func
Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) {
assert.NoError(t, err)
generatedProps, propsErr =
properties.LoadString(props.WithUserProperties(userProperties).Build())
assert.NoError(t, propsErr)
- assert.Equal(t, 13, len(generatedProps.Keys()))
+ assert.Equal(t, 14, len(generatedProps.Keys()))
assert.Equal(t, "",
generatedProps.GetString(constants.KogitoProcessDefinitionsEventsURL, ""))
assert.Equal(t, "false",
generatedProps.GetString(constants.KogitoProcessDefinitionsEventsEnabled, ""))
assert.Equal(t, "",
generatedProps.GetString(constants.KogitoProcessInstancesEventsURL, ""))
@@ -265,6 +268,7 @@ func
Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) {
assert.Equal(t,
"http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace+"/v2/jobs/events",
generatedProps.GetString(constants.JobServiceRequestEventsURL, ""))
assert.Equal(t, "",
generatedProps.GetString(constants.JobServiceStatusChangeEvents, ""))
assert.Equal(t, "",
generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, ""))
+ assert.Equal(t,
"http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace,
generatedProps.GetString(constants.KogitoJobServiceURL, ""))
// disabling job service bypasses config of outgoing events url
platform.Spec.Services.JobService.Enabled = nil
@@ -465,6 +469,7 @@ func generateJobServiceWorkflowProductionProperties()
*properties.Properties {
if jobServiceProdProperties == nil {
jobServiceProdProperties = properties.NewProperties()
jobServiceProdProperties.Set("kogito.service.url",
"http://foo.default")
+ jobServiceProdProperties.Set("kogito.jobs-service.url",
"http://foo-jobs-service.default")
jobServiceProdProperties.Set("quarkus.http.host", "0.0.0.0")
jobServiceProdProperties.Set("quarkus.http.port", "8080")
jobServiceProdProperties.Set("quarkus.kogito.devservices.enabled", "false")
@@ -489,7 +494,6 @@ func generateDataIndexWorkflowDevProperties()
*properties.Properties {
dataIndexDevProperties.Set("quarkus.devservices.enabled",
"false")
dataIndexDevProperties.Set("quarkus.kogito.devservices.enabled", "false")
dataIndexDevProperties.Set("org.kie.kogito.addons.knative.eventing.health-enabled",
"false")
- //TODO revisar, pero para el dev profile esto no va
dataIndexDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.connector",
"quarkus-http")
dataIndexDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.url",
"http://localhost/v2/jobs/events")
dataIndexDevProperties.Set("kogito.events.processdefinitions.enabled", "false")
@@ -504,6 +508,8 @@ func generateDataIndexWorkflowProductionProperties()
*properties.Properties {
if dataIndexProdProperties == nil {
dataIndexProdProperties = properties.NewProperties()
dataIndexProdProperties.Set("kogito.service.url",
"http://foo.default")
+ dataIndexProdProperties.Set("kogito.data-index.url",
"http://foo-data-index-service.default")
+ dataIndexProdProperties.Set("kogito.data-index.health-enabled",
"true")
dataIndexProdProperties.Set("quarkus.http.host", "0.0.0.0")
dataIndexProdProperties.Set("quarkus.http.port", "8080")
dataIndexProdProperties.Set("quarkus.devservices.enabled",
"false")
@@ -544,6 +550,9 @@ func
generateDataIndexAndJobServiceWorkflowProductionProperties() *properties.Pr
if dataIndexJobServiceProdProperties == nil {
dataIndexJobServiceProdProperties = properties.NewProperties()
dataIndexJobServiceProdProperties.Set("kogito.service.url",
"http://foo.default")
+ dataIndexJobServiceProdProperties.Set("kogito.data-index.url",
"http://foo-data-index-service.default")
+
dataIndexJobServiceProdProperties.Set("kogito.data-index.health-enabled",
"true")
+
dataIndexJobServiceProdProperties.Set("kogito.jobs-service.url",
"http://foo-jobs-service.default")
dataIndexJobServiceProdProperties.Set("quarkus.http.host",
"0.0.0.0")
dataIndexJobServiceProdProperties.Set("quarkus.http.port",
"8080")
dataIndexJobServiceProdProperties.Set("quarkus.kogito.devservices.enabled",
"false")
diff --git a/controllers/sonataflowplatform_controller_test.go
b/controllers/sonataflowplatform_controller_test.go
index 5594d644..11e5bb93 100644
--- a/controllers/sonataflowplatform_controller_test.go
+++ b/controllers/sonataflowplatform_controller_test.go
@@ -573,11 +573,11 @@ func TestSonataFlowPlatformController(t *testing.T) {
psDi := services.NewDataIndexHandler(ksp)
psDi2 := services.NewDataIndexHandler(ksp2)
assert.Equal(t,
ksp2.Status.ClusterPlatformRef.Services.DataIndexRef.Url,
psDi.GetLocalServiceBaseUrl())
- assert.Equal(t,
psDi.GetLocalServiceBaseUrl()+constants.KogitoProcessInstancesEventsPath,
psDi2.GetServiceUrl())
+ assert.Equal(t,
psDi.GetLocalServiceBaseUrl()+constants.KogitoProcessInstancesEventsPath,
psDi2.GetServiceBaseUrl()+constants.KogitoProcessInstancesEventsPath)
psJs := services.NewJobServiceHandler(ksp)
psJs2 := services.NewJobServiceHandler(ksp2)
assert.Equal(t,
ksp2.Status.ClusterPlatformRef.Services.JobServiceRef.Url,
psJs.GetLocalServiceBaseUrl())
- assert.Equal(t,
psJs.GetLocalServiceBaseUrl()+constants.JobServiceURLPath,
psJs2.GetServiceUrl())
+ assert.Equal(t,
psJs.GetLocalServiceBaseUrl()+constants.JobServiceJobEventsPath,
psJs2.GetServiceBaseUrl()+constants.JobServiceJobEventsPath)
ksp2.Spec.Services = &v1alpha08.ServicesPlatformSpec{}
diff --git a/controllers/workflowdef/utils.go b/controllers/workflowdef/utils.go
new file mode 100644
index 00000000..87a0356f
--- /dev/null
+++ b/controllers/workflowdef/utils.go
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package workflowdef
+
+import (
+ operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+ "github.com/serverlessworkflow/sdk-go/v2/model"
+)
+
+// HasTimeouts returns true if current workflow has configured any of the
SonataFlow supported timeouts, false
+// in any other case. This method might be reviewed when more timeouts are
supported.
+func HasTimeouts(workflow *operatorapi.SonataFlow) bool {
+ flow := &workflow.Spec.Flow
+ hasTimeouts := HasWorkflowExecTimeout(flow) ||
HasWorkflowEventTimeout(flow)
+ for i := 0; !hasTimeouts && i < len(flow.States); i++ {
+ state := flow.States[i]
+ switch state.Type {
+ case model.StateTypeEvent:
+ hasTimeouts = HasEventStateTimeouts(state.EventState)
+ case model.StateTypeOperation:
+ hasTimeouts =
HasOperationStateTimeouts(state.OperationState)
+ case model.StateTypeSwitch:
+ hasTimeouts = HasSwitchStateTimeouts(state.SwitchState)
+ case model.StateTypeSleep:
+ hasTimeouts = true
+ case model.StateTypeParallel:
+ hasTimeouts =
HasParallelStateTimeouts(state.ParallelState)
+ case model.StateTypeForEach:
+ hasTimeouts =
HasForEachStateTimeouts(state.ForEachState)
+ case model.StateTypeCallback:
+ hasTimeouts =
HasCallbackStateTimeouts(state.CallbackState)
+ }
+ }
+ return hasTimeouts
+}
+
+func HasWorkflowEventTimeout(flow *operatorapi.Flow) bool {
+ return flow.Timeouts != nil && len(flow.Timeouts.EventTimeout) > 0
+}
+func HasWorkflowExecTimeout(flow *operatorapi.Flow) bool {
+ return flow.Timeouts != nil && flow.Timeouts.WorkflowExecTimeout != nil
&& len(flow.Timeouts.WorkflowExecTimeout.Duration) > 0
+}
+
+func HasEventStateTimeouts(state *model.EventState) bool {
+ if state.Timeouts != nil && len(state.Timeouts.EventTimeout) > 0 {
+ return true
+ }
+ for _, onEvent := range state.OnEvents {
+ if hasActionsWithSleep(&onEvent.Actions) {
+ return true
+ }
+ }
+ return false
+}
+
+func HasOperationStateTimeouts(state *model.OperationState) bool {
+ return hasActionsWithSleep(&state.Actions)
+}
+
+func HasSwitchStateTimeouts(state *model.SwitchState) bool {
+ return state.Timeouts != nil && len(state.Timeouts.EventTimeout) > 0
+}
+
+func HasParallelStateTimeouts(state *model.ParallelState) bool {
+ for _, branch := range state.Branches {
+ if hasBranchTimeouts(&branch) {
+ return true
+ }
+ }
+ return false
+}
+
+func hasBranchTimeouts(branch *model.Branch) bool {
+ return hasActionsWithSleep(&branch.Actions)
+}
+
+func HasForEachStateTimeouts(state *model.ForEachState) bool {
+ return hasActionsWithSleep(&state.Actions)
+}
+
+func HasCallbackStateTimeouts(state *model.CallbackState) bool {
+ return (state.Timeouts != nil && len(state.Timeouts.EventTimeout) > 0)
|| hasAnySleep(&state.Action)
+}
+
+func hasActionsWithSleep(actions *[]model.Action) bool {
+ for _, action := range *actions {
+ if hasAnySleep(&action) {
+ return true
+ }
+ }
+ return false
+}
+
+func hasAnySleep(action *model.Action) bool {
+ return action.Sleep != nil && (len(action.Sleep.Before) > 0 ||
len(action.Sleep.After) > 0)
+}
diff --git a/controllers/workflowdef/utils_suite_test.go
b/controllers/workflowdef/utils_suite_test.go
new file mode 100644
index 00000000..aa3919a7
--- /dev/null
+++ b/controllers/workflowdef/utils_suite_test.go
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package workflowdef
+
+import (
+ "testing"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+)
+
+func TestProperties(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Utils Suite")
+}
diff --git a/controllers/workflowdef/utils_test.go
b/controllers/workflowdef/utils_test.go
new file mode 100644
index 00000000..9e279b10
--- /dev/null
+++ b/controllers/workflowdef/utils_test.go
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package workflowdef
+
+import (
+ operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model"
+)
+
+var (
+ emptyDuration = ""
+ isoDuration = "PT30S"
+)
+
+var _ = DescribeTable("Workflow has timeouts",
+ func(workflow *operatorapi.SonataFlow, expectedHasTimeouts bool) {
+ hasTimeouts := HasTimeouts(workflow)
+ Expect(hasTimeouts).Should(Equal(expectedHasTimeouts))
+ },
+ Entry("for a workflow with WorkflowExecTimeout",
workflowWithWorkflowExecTimeout(&isoDuration), true),
+ Entry("for a workflow with empty WorkflowExecTimeout",
workflowWithWorkflowExecTimeout(&emptyDuration), false),
+ Entry("for a workflow with nil WorkflowExecTimeout",
workflowWithWorkflowExecTimeout(&emptyDuration), false),
+
+ Entry("for a workflow with WorkflowEventTimeout",
workflowWithWorkflowEventStateTimeout(&isoDuration), true),
+ Entry("for a workflow with empty WorkflowEventTimeout",
workflowWithWorkflowEventStateTimeout(&emptyDuration), false),
+ Entry("for a workflow with nil WorkflowEventTimeout",
workflowWithWorkflowEventStateTimeout(nil), false),
+
+ Entry("for a workflow with EventState with timeouts",
workflowWithEventStateWithTimeout(&isoDuration), true),
+ Entry("for a workflow with EventState empty timeouts",
workflowWithEventStateWithTimeout(&emptyDuration), false),
+ Entry("for a workflow with EventState nil timeouts",
workflowWithEventStateWithTimeout(&emptyDuration), false),
+ Entry("for a workflow with EventState with action sleep at before",
workflowWithEventStateWithActionSleep(true, false), true),
+ Entry("for a workflow with EventState with action sleep at before",
workflowWithEventStateWithActionSleep(false, true), true),
+
+ Entry("for a workflow with OperationState with action sleep at before",
workflowWithEventStateWithActionSleep(true, false), true),
+ Entry("for a workflow with OperationState with with action sleep at
after", workflowWithEventStateWithActionSleep(false, true), true),
+ Entry("for a workflow with OperationState with no action sleep",
workflowWithEventStateWithActionSleep(false, false), false),
+
+ Entry("for a workflow with SwitchState with timeouts",
workflowWithSwitchStateWithTimeout(&isoDuration), true),
+ Entry("for a workflow with SwitchState with empty timeouts",
workflowWithSwitchStateWithTimeout(&emptyDuration), false),
+ Entry("for a workflow with SwitchState with nil timeouts",
workflowWithSwitchStateWithTimeout(nil), false),
+
+ Entry("for a workflow with SleepState", workflowWithSleepState(), true),
+
+ Entry("for a workflow with ParallelState with branch with sleep at
before", workflowWithParallelState(true, false), true),
+ Entry("for a workflow with ParallelState with branch with sleep at
after", workflowWithParallelState(false, true), true),
+ Entry("for a workflow with ParallelState with branches with sleep at
before and after", workflowWithParallelState(true, true), true),
+ Entry("for a workflow with ParallelState with no sleep branches",
workflowWithParallelState(false, false), false),
+
+ Entry("for a workflow with ForEachState with action sleep at before",
workflowWithForEachStateWithActionSleep(true, false), true),
+ Entry("for a workflow with ForEachState with with action sleep at
after", workflowWithForEachStateWithActionSleep(false, true), true),
+ Entry("for a workflow with ForEachState with no action sleep",
workflowWithForEachStateWithActionSleep(false, false), false),
+
+ Entry("for a workflow with CallbackState with timeouts",
workflowWithCallbackStateTimeoutAndActionSleep(&isoDuration, nil, nil), true),
+ Entry("for a workflow with CallbackState with nil timeouts and before
action sleep", workflowWithCallbackStateTimeoutAndActionSleep(nil,
&isoDuration, nil), true),
+ Entry("for a workflow with CallbackState with nil timeouts and after
action sleep", workflowWithCallbackStateTimeoutAndActionSleep(nil, nil,
&isoDuration), true),
+ Entry("for a workflow with CallbackState with nil timeouts and no
action sleep", workflowWithCallbackStateTimeoutAndActionSleep(nil, nil, nil),
false),
+)
+
+func workflowWithWorkflowExecTimeout(duration *string) *operatorapi.SonataFlow
{
+ wf := generateWorkflow()
+ if duration != nil {
+ wf.Spec.Flow.Timeouts = &cncfmodel.Timeouts{}
+ wf.Spec.Flow.Timeouts.WorkflowExecTimeout =
&cncfmodel.WorkflowExecTimeout{
+ Duration: *duration,
+ }
+ }
+ return wf
+}
+
+func workflowWithWorkflowEventStateTimeout(duration *string)
*operatorapi.SonataFlow {
+ wf := generateWorkflow()
+ if duration != nil {
+ wf.Spec.Flow.Timeouts = &cncfmodel.Timeouts{
+ EventTimeout: *duration,
+ }
+ }
+ return wf
+}
+
+func workflowWithEventStateWithTimeout(duration *string)
*operatorapi.SonataFlow {
+ wf := generateWorkflow()
+ state := generateEventState()
+ if duration != nil {
+ state.EventState.Timeouts =
&cncfmodel.EventStateTimeout{EventTimeout: *duration}
+ }
+ wf.Spec.Flow.States = []cncfmodel.State{*state}
+ return wf
+}
+
+func workflowWithEventStateWithActionSleep(before bool, after bool)
*operatorapi.SonataFlow {
+ wf := generateWorkflow()
+ state := generateEventState()
+ wf.Spec.Flow.States = []cncfmodel.State{*state}
+ state.EventState.OnEvents = []cncfmodel.OnEvents{
+ {
+ Actions: generateActionsWithSleep(before, after),
+ },
+ }
+ return wf
+}
+
+func workflowWithOperationStateWithActionSleep(before bool, after bool)
*operatorapi.SonataFlow {
+ wf := generateWorkflow()
+ state := generateOperationState()
+ wf.Spec.Flow.States = []cncfmodel.State{*state}
+ state.OperationState.Actions = generateActionsWithSleep(before, after)
+ return wf
+}
+
+func workflowWithSwitchStateWithTimeout(duration *string)
*operatorapi.SonataFlow {
+ wf := generateWorkflow()
+ state := generateSwitchState()
+ wf.Spec.Flow.States = []cncfmodel.State{*state}
+ if duration != nil {
+ state.SwitchState.Timeouts = &cncfmodel.SwitchStateTimeout{
+ EventTimeout: *duration,
+ }
+ }
+ return wf
+}
+
+func workflowWithSleepState() *operatorapi.SonataFlow {
+ wf := generateWorkflow()
+ wf.Spec.Flow.States = []cncfmodel.State{*generateSleepState()}
+ return wf
+}
+
+func workflowWithParallelState(branchWithBeforeSleep bool,
branchWithAfterSleep bool) *operatorapi.SonataFlow {
+ wf := generateWorkflow()
+ state := generateParallelState()
+ wf.Spec.Flow.States = []cncfmodel.State{*state}
+ if branchWithBeforeSleep {
+ branch := cncfmodel.Branch{
+ Actions: []cncfmodel.Action{{Sleep:
&cncfmodel.Sleep{Before: "PT5S"}}},
+ }
+ state.ParallelState.Branches =
append(state.ParallelState.Branches, branch)
+ }
+ if branchWithAfterSleep {
+ branch := cncfmodel.Branch{
+ Actions: []cncfmodel.Action{{Sleep:
&cncfmodel.Sleep{After: "PT5S"}}},
+ }
+ state.ParallelState.Branches =
append(state.ParallelState.Branches, branch)
+ }
+ return wf
+}
+
+func workflowWithForEachStateWithActionSleep(before bool, after bool)
*operatorapi.SonataFlow {
+ wf := generateWorkflow()
+ state := generateForEachState()
+ wf.Spec.Flow.States = []cncfmodel.State{*state}
+ state.ForEachState.Actions = generateActionsWithSleep(before, after)
+ return wf
+}
+
+func workflowWithCallbackStateTimeoutAndActionSleep(duration *string, before
*string, after *string) *operatorapi.SonataFlow {
+ wf := generateWorkflow()
+ state := generateCallbackState()
+ wf.Spec.Flow.States = []cncfmodel.State{*state}
+ if duration != nil {
+ state.CallbackState.Timeouts =
&cncfmodel.CallbackStateTimeout{EventTimeout: *duration}
+ }
+ state.CallbackState.Action = cncfmodel.Action{}
+ if before != nil || after != nil {
+ state.CallbackState.Action.Sleep = &cncfmodel.Sleep{}
+ if before != nil {
+ state.CallbackState.Action.Sleep.Before = *before
+ }
+ if after != nil {
+ state.CallbackState.Action.Sleep.After = *after
+ }
+ }
+ return wf
+}
+
+func generateWorkflow() *operatorapi.SonataFlow {
+ wf := &operatorapi.SonataFlow{
+ Spec: operatorapi.SonataFlowSpec{
+ Flow: operatorapi.Flow{},
+ },
+ }
+ return wf
+}
+
+func generateEventState() *cncfmodel.State {
+ return &cncfmodel.State{
+ BaseState: cncfmodel.BaseState{
+ Type: cncfmodel.StateTypeEvent,
+ },
+ EventState: &cncfmodel.EventState{},
+ }
+}
+
+func generateOperationState() *cncfmodel.State {
+ return &cncfmodel.State{
+ BaseState: cncfmodel.BaseState{
+ Type: cncfmodel.StateTypeOperation,
+ },
+ OperationState: &cncfmodel.OperationState{},
+ }
+}
+
+func generateSwitchState() *cncfmodel.State {
+ return &cncfmodel.State{
+ BaseState: cncfmodel.BaseState{
+ Type: cncfmodel.StateTypeSwitch,
+ },
+ SwitchState: &cncfmodel.SwitchState{},
+ }
+}
+
+func generateSleepState() *cncfmodel.State {
+ return &cncfmodel.State{
+ BaseState: cncfmodel.BaseState{
+ Type: cncfmodel.StateTypeSleep,
+ },
+ SleepState: &cncfmodel.SleepState{},
+ }
+}
+
+func generateParallelState() *cncfmodel.State {
+ return &cncfmodel.State{
+ BaseState: cncfmodel.BaseState{
+ Type: cncfmodel.StateTypeParallel,
+ },
+ ParallelState: &cncfmodel.ParallelState{
+ Branches: []cncfmodel.Branch{},
+ },
+ }
+}
+
+func generateForEachState() *cncfmodel.State {
+ return &cncfmodel.State{
+ BaseState: cncfmodel.BaseState{
+ Type: cncfmodel.StateTypeForEach,
+ },
+ ForEachState: &cncfmodel.ForEachState{},
+ }
+}
+
+func generateCallbackState() *cncfmodel.State {
+ return &cncfmodel.State{
+ BaseState: cncfmodel.BaseState{
+ Type: cncfmodel.StateTypeCallback,
+ },
+ CallbackState: &cncfmodel.CallbackState{},
+ }
+}
+
+func generateActionsWithSleep(before bool, after bool) []cncfmodel.Action {
+ var actions []cncfmodel.Action
+ if before {
+ actions = append(actions, cncfmodel.Action{
+ Sleep: &cncfmodel.Sleep{
+ Before: "PT30S",
+ },
+ })
+ }
+ if after {
+ actions = append(actions, cncfmodel.Action{
+ Sleep: &cncfmodel.Sleep{
+ After: "PT30S",
+ },
+ })
+ }
+ return actions
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]