This is an automated email from the ASF dual-hosted git repository.
ricardozanini pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-serverless-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 2b5056d1 [KOGITO-9982] Add Persistence field in SonataFlow CRD (#322)
2b5056d1 is described below
commit 2b5056d12bb9a18116070d7e58f3b777218ca385
Author: Jordi Gil <[email protected]>
AuthorDate: Fri Jan 26 08:46:05 2024 -0500
[KOGITO-9982] Add Persistence field in SonataFlow CRD (#322)
* Added Persistence field in SonataFlow CRD
Signed-off-by: Jordi Gil <[email protected]>
* Added implementation to manage PostgreSQL persistence type in the
SonataFlow CRD
Signed-off-by: Jordi Gil <[email protected]>
* Added end to end tests
Signed-off-by: Jordi Gil <[email protected]>
* Retry health endpoints
Signed-off-by: Jordi Gil <[email protected]>
* Changes based on Ricardo's feedback
Signed-off-by: Jordi Gil <[email protected]>
* Reorganized persistence e2e test and tweaked health check to cover for
worfklows that end very quickly, like callbackstatetimeouts
Signed-off-by: Jordi Gil <[email protected]>
* Changes after merge conflicts with platform e2e tests
Signed-off-by: Jordi Gil <[email protected]>
* Added additional check for failing test in workflowproj
Signed-off-by: Jordi Gil <[email protected]>
---------
Signed-off-by: Jordi Gil <[email protected]>
---
api/v1alpha08/sonataflow_types.go | 3 +
api/v1alpha08/zz_generated.deepcopy.go | 5 +
bundle/manifests/sonataflow.org_sonataflows.yaml | 59 ++++++++
config/crd/bases/sonataflow.org_sonataflows.yaml | 59 ++++++++
controllers/platform/services/services.go | 148 ++----------------
controllers/profiles/common/object_creators.go | 21 ++-
.../profiles/common/object_creators_test.go | 168 +++++++++++++++++++++
.../profiles/common/persistence/postgreSQL.go | 94 ++++++++++++
operator.yaml | 59 ++++++++
test/e2e/helpers.go | 27 ++++
test/e2e/platform_test.go | 3 +
test/e2e/workflow_test.go | 75 ++++++++-
.../workflow/by_service/01-postgres.yaml | 86 +++++++++++
.../by_service/02-sonataflow_platform.yaml | 27 ++++
.../03-sonataflow_callbackstatetimeouts.sw.yaml | 102 +++++++++++++
.../workflow/by_service/kustomization.yaml | 33 ++++
test/yaml.go | 34 +++--
workflowproj/workflowproj_test.go | 3 +
18 files changed, 855 insertions(+), 151 deletions(-)
diff --git a/api/v1alpha08/sonataflow_types.go
b/api/v1alpha08/sonataflow_types.go
index 169aad0a..37bd50f3 100644
--- a/api/v1alpha08/sonataflow_types.go
+++ b/api/v1alpha08/sonataflow_types.go
@@ -657,6 +657,9 @@ type SonataFlowSpec struct {
// PodTemplate describes the deployment details of this SonataFlow
instance.
//+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="podTemplate"
PodTemplate PodTemplateSpec `json:"podTemplate,omitempty"`
+ // Persists service to a datasource of choice. Ephemeral by default.
+ // +optional
+ Persistence *PersistenceOptions `json:"persistence,omitempty"`
}
// SonataFlowStatus defines the observed state of SonataFlow
diff --git a/api/v1alpha08/zz_generated.deepcopy.go
b/api/v1alpha08/zz_generated.deepcopy.go
index 0260a749..3c24b8c6 100644
--- a/api/v1alpha08/zz_generated.deepcopy.go
+++ b/api/v1alpha08/zz_generated.deepcopy.go
@@ -902,6 +902,11 @@ func (in *SonataFlowSpec) DeepCopyInto(out
*SonataFlowSpec) {
in.Flow.DeepCopyInto(&out.Flow)
in.Resources.DeepCopyInto(&out.Resources)
in.PodTemplate.DeepCopyInto(&out.PodTemplate)
+ if in.Persistence != nil {
+ in, out := &in.Persistence, &out.Persistence
+ *out = new(PersistenceOptions)
+ (*in).DeepCopyInto(*out)
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new SonataFlowSpec.
diff --git a/bundle/manifests/sonataflow.org_sonataflows.yaml
b/bundle/manifests/sonataflow.org_sonataflows.yaml
index 5c026eb2..13d4cf53 100644
--- a/bundle/manifests/sonataflow.org_sonataflows.yaml
+++ b/bundle/manifests/sonataflow.org_sonataflows.yaml
@@ -2102,6 +2102,65 @@ spec:
required:
- states
type: object
+ persistence:
+ description: Persists service to a datasource of choice.
Ephemeral
+ by default.
+ maxProperties: 1
+ properties:
+ postgresql:
+ description: Connect configured services to a postgresql
database.
+ maxProperties: 2
+ minProperties: 2
+ properties:
+ jdbcUrl:
+ description: PostgreSql JDBC URL. Mutually exclusive
to serviceRef.
+ e.g.
"jdbc:postgresql://host:port/database?currentSchema=data-index-service"
+ type: string
+ secretRef:
+ description: Secret reference to the database user
credentials
+ properties:
+ name:
+ description: Name of the postgresql credentials
secret.
+ type: string
+ passwordKey:
+ description: Defaults to POSTGRESQL_PASSWORD
+ type: string
+ userKey:
+ description: Defaults to POSTGRESQL_USER
+ type: string
+ required:
+ - name
+ type: object
+ serviceRef:
+ description: Service reference to postgresql
datasource. Mutually
+ exclusive to jdbcUrl.
+ properties:
+ databaseName:
+ description: Name of postgresql database to be
used. Defaults
+ to "sonataflow"
+ type: string
+ databaseSchema:
+ description: Schema of postgresql database to be
used.
+ Defaults to "data-index-service"
+ type: string
+ name:
+ description: Name of the postgresql k8s service.
+ type: string
+ namespace:
+ description: Namespace of the postgresql k8s
service.
+ Defaults to the SonataFlowPlatform's local
namespace.
+ type: string
+ port:
+ description: Port to use when connecting to the
postgresql
+ k8s service. Defaults to 5432.
+ type: integer
+ required:
+ - name
+ type: object
+ required:
+ - secretRef
+ type: object
+ type: object
podTemplate:
description: PodTemplate describes the deployment details of
this
SonataFlow instance.
diff --git a/config/crd/bases/sonataflow.org_sonataflows.yaml
b/config/crd/bases/sonataflow.org_sonataflows.yaml
index 1123a77c..4ec54e7f 100644
--- a/config/crd/bases/sonataflow.org_sonataflows.yaml
+++ b/config/crd/bases/sonataflow.org_sonataflows.yaml
@@ -2103,6 +2103,65 @@ spec:
required:
- states
type: object
+ persistence:
+ description: Persists service to a datasource of choice.
Ephemeral
+ by default.
+ maxProperties: 1
+ properties:
+ postgresql:
+ description: Connect configured services to a postgresql
database.
+ maxProperties: 2
+ minProperties: 2
+ properties:
+ jdbcUrl:
+ description: PostgreSql JDBC URL. Mutually exclusive
to serviceRef.
+ e.g.
"jdbc:postgresql://host:port/database?currentSchema=data-index-service"
+ type: string
+ secretRef:
+ description: Secret reference to the database user
credentials
+ properties:
+ name:
+ description: Name of the postgresql credentials
secret.
+ type: string
+ passwordKey:
+ description: Defaults to POSTGRESQL_PASSWORD
+ type: string
+ userKey:
+ description: Defaults to POSTGRESQL_USER
+ type: string
+ required:
+ - name
+ type: object
+ serviceRef:
+ description: Service reference to postgresql
datasource. Mutually
+ exclusive to jdbcUrl.
+ properties:
+ databaseName:
+ description: Name of postgresql database to be
used. Defaults
+ to "sonataflow"
+ type: string
+ databaseSchema:
+ description: Schema of postgresql database to be
used.
+ Defaults to "data-index-service"
+ type: string
+ name:
+ description: Name of the postgresql k8s service.
+ type: string
+ namespace:
+ description: Namespace of the postgresql k8s
service.
+ Defaults to the SonataFlowPlatform's local
namespace.
+ type: string
+ port:
+ description: Port to use when connecting to the
postgresql
+ k8s service. Defaults to 5432.
+ type: integer
+ required:
+ - name
+ type: object
+ required:
+ - secretRef
+ type: object
+ type: object
podTemplate:
description: PodTemplate describes the deployment details of
this
SonataFlow instance.
diff --git a/controllers/platform/services/services.go
b/controllers/platform/services/services.go
index 3637c275..c04b4eb3 100644
--- a/controllers/platform/services/services.go
+++ b/controllers/platform/services/services.go
@@ -21,19 +21,24 @@ package services
import (
"fmt"
- "strconv"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/persistence"
"github.com/magiconair/properties"
"github.com/apache/incubator-kie-kogito-serverless-operator/version"
"github.com/imdario/mergo"
)
+const (
+ quarkusHibernateORMDatabaseGeneration string =
"QUARKUS_HIBERNATE_ORM_DATABASE_GENERATION"
+ quarkusFlywayMigrateAtStart string =
"QUARKUS_FLYWAY_MIGRATE_AT_START"
+)
+
type PlatformServiceHandler interface {
// GetContainerName returns the name of the service's container in the
deployment.
GetContainerName() string
@@ -137,7 +142,9 @@ func (d DataIndexHandler)
ConfigurePersistence(containerSpec *corev1.Container)
if d.platform.Spec.Services.DataIndex.Persistence != nil &&
d.platform.Spec.Services.DataIndex.Persistence.PostgreSql != nil {
c := containerSpec.DeepCopy()
c.Image =
d.GetServiceImageName(constants.PersistenceTypePostgreSQL)
- c.Env = append(c.Env,
d.configurePostgreSqlEnv(d.platform.Spec.Services.DataIndex.Persistence.PostgreSql,
d.GetServiceName(), d.platform.Namespace)...)
+ c.Env = append(c.Env,
persistence.ConfigurePostgreSqlEnv(d.platform.Spec.Services.DataIndex.Persistence.PostgreSql,
d.GetServiceName(), d.platform.Namespace)...)
+ // specific to DataIndex
+ c.Env = append(c.Env, corev1.EnvVar{Name:
quarkusHibernateORMDatabaseGeneration, Value: "update"}, corev1.EnvVar{Name:
quarkusFlywayMigrateAtStart, Value: "true"})
return c
}
return containerSpec
@@ -160,74 +167,6 @@ func (d DataIndexHandler) GetServiceCmName() string {
return fmt.Sprintf("%s-props", d.GetServiceName())
}
-func (d DataIndexHandler) configurePostgreSqlEnv(postgresql
*operatorapi.PersistencePostgreSql, databaseSchema, databaseNamespace string)
[]corev1.EnvVar {
- dataSourcePort := constants.DefaultPostgreSQLPort
- databaseName := "sonataflow"
- dataSourceURL := postgresql.JdbcUrl
- if postgresql.ServiceRef != nil {
- if len(postgresql.ServiceRef.DatabaseSchema) > 0 {
- databaseSchema = postgresql.ServiceRef.DatabaseSchema
- }
- if len(postgresql.ServiceRef.Namespace) > 0 {
- databaseNamespace = postgresql.ServiceRef.Namespace
- }
- if postgresql.ServiceRef.Port != nil {
- dataSourcePort = *postgresql.ServiceRef.Port
- }
- if len(postgresql.ServiceRef.DatabaseName) > 0 {
- databaseName = postgresql.ServiceRef.DatabaseName
- }
- dataSourceURL = "jdbc:" + constants.PersistenceTypePostgreSQL +
"://" + postgresql.ServiceRef.Name + "." + databaseNamespace + ":" +
strconv.Itoa(dataSourcePort) + "/" + databaseName + "?currentSchema=" +
databaseSchema
- }
- secretRef := corev1.LocalObjectReference{
- Name: postgresql.SecretRef.Name,
- }
- quarkusDatasourceUsername := "POSTGRESQL_USER"
- if len(postgresql.SecretRef.UserKey) > 0 {
- quarkusDatasourceUsername = postgresql.SecretRef.UserKey
- }
- quarkusDatasourcePassword := "POSTGRESQL_PASSWORD"
- if len(postgresql.SecretRef.PasswordKey) > 0 {
- quarkusDatasourcePassword = postgresql.SecretRef.PasswordKey
- }
- return []corev1.EnvVar{
- {
- Name: "QUARKUS_DATASOURCE_USERNAME",
- ValueFrom: &corev1.EnvVarSource{
- SecretKeyRef: &corev1.SecretKeySelector{
- Key:
quarkusDatasourceUsername,
- LocalObjectReference: secretRef,
- },
- },
- },
- {
- Name: "QUARKUS_DATASOURCE_PASSWORD",
- ValueFrom: &corev1.EnvVarSource{
- SecretKeyRef: &corev1.SecretKeySelector{
- Key:
quarkusDatasourcePassword,
- LocalObjectReference: secretRef,
- },
- },
- },
- {
- Name: "QUARKUS_DATASOURCE_DB_KIND",
- Value: constants.PersistenceTypePostgreSQL,
- },
- {
- Name: "QUARKUS_HIBERNATE_ORM_DATABASE_GENERATION",
- Value: "update",
- },
- {
- Name: "QUARKUS_FLYWAY_MIGRATE_AT_START",
- Value: "true",
- },
- {
- Name: "QUARKUS_DATASOURCE_JDBC_URL",
- Value: dataSourceURL,
- },
- }
-}
-
func (d DataIndexHandler) GenerateWorkflowProperties()
(*properties.Properties, error) {
props := properties.NewProperties()
if d.platform.Spec.Services.DataIndex != nil {
@@ -318,7 +257,9 @@ func (j JobServiceHandler)
ConfigurePersistence(containerSpec *corev1.Container)
if j.platform.Spec.Services.JobService.Persistence != nil &&
j.platform.Spec.Services.JobService.Persistence.PostgreSql != nil {
c := containerSpec.DeepCopy()
c.Image =
j.GetServiceImageName(constants.PersistenceTypePostgreSQL)
- c.Env = append(c.Env,
j.configurePostgreSqlEnv(j.platform.Spec.Services.JobService.Persistence.PostgreSql,
j.GetServiceName(), j.platform.Namespace)...)
+ c.Env = append(c.Env,
persistence.ConfigurePostgreSqlEnv(j.platform.Spec.Services.JobService.Persistence.PostgreSql,
j.GetServiceName(), j.platform.Namespace)...)
+ // Specific to Job Service
+ c.Env = append(c.Env, corev1.EnvVar{Name:
quarkusFlywayMigrateAtStart, Value: "true"})
return c
}
return containerSpec
@@ -330,71 +271,6 @@ func (j JobServiceHandler) MergePodSpec(podSpec
corev1.PodSpec) (corev1.PodSpec,
return *c, err
}
-func (j JobServiceHandler) configurePostgreSqlEnv(postgresql
*operatorapi.PersistencePostgreSql, databaseSchema, databaseNamespace string)
[]corev1.EnvVar {
- dataSourcePort := constants.DefaultPostgreSQLPort
- databaseName := "sonataflow"
- dataSourceURL := postgresql.JdbcUrl
- if postgresql.ServiceRef != nil {
- if len(postgresql.ServiceRef.DatabaseSchema) > 0 {
- databaseSchema = postgresql.ServiceRef.DatabaseSchema
- }
- if len(postgresql.ServiceRef.Namespace) > 0 {
- databaseNamespace = postgresql.ServiceRef.Namespace
- }
- if postgresql.ServiceRef.Port != nil {
- dataSourcePort = *postgresql.ServiceRef.Port
- }
- if len(postgresql.ServiceRef.DatabaseName) > 0 {
- databaseName = postgresql.ServiceRef.DatabaseName
- }
- dataSourceURL = "jdbc:" + constants.PersistenceTypePostgreSQL +
"://" + postgresql.ServiceRef.Name + "." + databaseNamespace + ":" +
strconv.Itoa(dataSourcePort) + "/" + databaseName + "?currentSchema=" +
databaseSchema
- }
-
- secretRef := corev1.LocalObjectReference{
- Name: postgresql.SecretRef.Name,
- }
- quarkusDatasourceUsername := "POSTGRESQL_USER"
- if len(postgresql.SecretRef.UserKey) > 0 {
- quarkusDatasourceUsername = postgresql.SecretRef.UserKey
- }
- quarkusDatasourcePassword := "POSTGRESQL_PASSWORD"
- if len(postgresql.SecretRef.PasswordKey) > 0 {
- quarkusDatasourcePassword = postgresql.SecretRef.PasswordKey
- }
- return []corev1.EnvVar{
- {
- Name: "QUARKUS_DATASOURCE_USERNAME",
- ValueFrom: &corev1.EnvVarSource{
- SecretKeyRef: &corev1.SecretKeySelector{
- Key:
quarkusDatasourceUsername,
- LocalObjectReference: secretRef,
- },
- },
- },
- {
- Name: "QUARKUS_DATASOURCE_PASSWORD",
- ValueFrom: &corev1.EnvVarSource{
- SecretKeyRef: &corev1.SecretKeySelector{
- Key:
quarkusDatasourcePassword,
- LocalObjectReference: secretRef,
- },
- },
- },
- {
- Name: "QUARKUS_DATASOURCE_DB_KIND",
- Value: constants.PersistenceTypePostgreSQL,
- },
- {
- Name: "QUARKUS_FLYWAY_MIGRATE_AT_START",
- Value: "true",
- },
- {
- Name: "QUARKUS_DATASOURCE_JDBC_URL",
- Value: dataSourceURL,
- },
- }
-}
-
func (j JobServiceHandler) GenerateServiceProperties()
(*properties.Properties, error) {
props := properties.NewProperties()
props.Set(constants.KogitoServiceURLProperty,
generateServiceURL(constants.KogitoServiceURLProtocol, j.platform.Namespace,
j.GetServiceName()))
diff --git a/controllers/profiles/common/object_creators.go
b/controllers/profiles/common/object_creators.go
index 55c95d8a..79f2b660 100644
--- a/controllers/profiles/common/object_creators.go
+++ b/controllers/profiles/common/object_creators.go
@@ -29,6 +29,7 @@ import (
operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/persistence"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/properties"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
kubeutil
"github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
@@ -57,9 +58,12 @@ const (
healthStartedFailureThreshold = 5
healthStartedPeriodSeconds = 15
healthStartedInitialDelaySeconds = 10
+ defaultSchemaName = "default"
)
-var DefaultHTTPWorkflowPortIntStr =
intstr.FromInt(constants.DefaultHTTPWorkflowPortInt)
+var (
+ DefaultHTTPWorkflowPortIntStr =
intstr.FromInt(constants.DefaultHTTPWorkflowPortInt)
+)
// DeploymentCreator is an objectCreator for a base Kubernetes Deployments for
profiles that need to deploy the workflow on a vanilla deployment.
// It serves as a basis for a basic Quarkus Java application, expected to
listen on http 8080.
@@ -113,7 +117,7 @@ func defaultContainer(workflow *operatorapi.SonataFlow)
(*corev1.Container, erro
Name: utils.HttpScheme,
Protocol: corev1.ProtocolTCP,
}
- defaultFlowContainer := corev1.Container{
+ defaultFlowContainer := &corev1.Container{
Name: operatorapi.DefaultContainerName,
Ports:
[]corev1.ContainerPort{defaultContainerPort},
TerminationMessagePolicy:
corev1.TerminationMessageFallbackToLogsOnError,
@@ -151,9 +155,10 @@ func defaultContainer(workflow *operatorapi.SonataFlow)
(*corev1.Container, erro
SecurityContext: kubeutil.SecurityDefaults(),
}
// Merge with flowContainer
- if err := mergo.Merge(&defaultFlowContainer,
workflow.Spec.PodTemplate.Container.ToContainer(), mergo.WithOverride); err !=
nil {
+ if err := mergo.Merge(defaultFlowContainer,
workflow.Spec.PodTemplate.Container.ToContainer(), mergo.WithOverride); err !=
nil {
return nil, err
}
+ defaultFlowContainer = ConfigurePersistence(defaultFlowContainer,
workflow.Spec.Persistence, defaultSchemaName, workflow.Namespace)
// immutable
defaultFlowContainer.Name = operatorapi.DefaultContainerName
portIdx := -1
@@ -170,7 +175,7 @@ func defaultContainer(workflow *operatorapi.SonataFlow)
(*corev1.Container, erro
defaultFlowContainer.Ports[portIdx] = defaultContainerPort
}
- return &defaultFlowContainer, nil
+ return defaultFlowContainer, nil
}
// ServiceCreator is an objectCreator for a basic Service aiming a vanilla
Kubernetes Deployment.
@@ -213,3 +218,11 @@ func WorkflowPropsConfigMapCreator(workflow
*operatorapi.SonataFlow) (client.Obj
}
return workflowproj.CreateNewAppPropsConfigMap(workflow, props), nil
}
+
+func ConfigurePersistence(serviceContainer *corev1.Container, options
*operatorapi.PersistenceOptions, defaultSchema, namespace string)
*corev1.Container {
+ c := serviceContainer.DeepCopy()
+ if options != nil && options.PostgreSql != nil {
+ c.Env = append(c.Env,
persistence.ConfigurePostgreSqlEnv(options.PostgreSql, defaultSchema,
namespace)...)
+ }
+ return c
+}
diff --git a/controllers/profiles/common/object_creators_test.go
b/controllers/profiles/common/object_creators_test.go
index f942a56e..64e8f43b 100644
--- a/controllers/profiles/common/object_creators_test.go
+++ b/controllers/profiles/common/object_creators_test.go
@@ -169,3 +169,171 @@ func TestMergePodSpec_OverrideContainers(t *testing.T) {
assert.Equal(t, int32(8080), flowContainer.Ports[0].ContainerPort)
assert.Empty(t, flowContainer.Env)
}
+
+func TestMergePodSpec_WithPostgreSQL_and_JDBC_URL_field(t *testing.T) {
+ workflow := test.GetBaseSonataFlow(t.Name())
+ workflow.Spec = v1alpha08.SonataFlowSpec{
+ PodTemplate: v1alpha08.PodTemplateSpec{
+ Container: v1alpha08.ContainerSpec{
+ // this one we can override
+ Image: "quay.io/example/my-workflow:1.0.0",
+ Ports: []corev1.ContainerPort{
+ // let's override a immutable attribute
+ {Name: utils.HttpScheme, ContainerPort:
9090},
+ },
+ Env: []corev1.EnvVar{
+ // We should be able to override this
too
+ {Name: "ENV1", Value: "VALUE_CUSTOM"},
+ },
+ VolumeMounts: []corev1.VolumeMount{
+ {Name: "myvolume", ReadOnly: true,
MountPath: "/tmp/any/path"},
+ },
+ },
+ PodSpec: v1alpha08.PodSpec{
+ ServiceAccountName: "superuser",
+ Containers: []corev1.Container{
+ {
+ Name: "sidecar",
+ },
+ },
+ Volumes: []corev1.Volume{
+ {
+ Name: "myvolume",
+ VolumeSource:
corev1.VolumeSource{
+ ConfigMap:
&corev1.ConfigMapVolumeSource{
+
LocalObjectReference: corev1.LocalObjectReference{Name: "customproperties"},
+ },
+ },
+ },
+ },
+ },
+ },
+ Persistence: &v1alpha08.PersistenceOptions{
+ PostgreSql: &v1alpha08.PersistencePostgreSql{
+ SecretRef:
v1alpha08.PostgreSqlSecretOptions{Name: "test"},
+ JdbcUrl:
"jdbc:postgresql://host:port/database?currentSchema=workflow",
+ },
+ },
+ }
+
+ object, err := DeploymentCreator(workflow)
+ assert.NoError(t, err)
+
+ deployment := object.(*appsv1.Deployment)
+ expectedEnvVars := []corev1.EnvVar{
+ {
+ Name: "ENV1",
+ Value: "VALUE_CUSTOM",
+ },
+ {
+ Name: "QUARKUS_DATASOURCE_USERNAME",
+ Value: "",
+ ValueFrom: &corev1.EnvVarSource{
+ SecretKeyRef: &corev1.SecretKeySelector{
+ LocalObjectReference:
corev1.LocalObjectReference{Name: "test"}, Key: "POSTGRESQL_USER",
+ },
+ },
+ },
+ {
+ Name: "QUARKUS_DATASOURCE_PASSWORD",
+ Value: "",
+ ValueFrom: &corev1.EnvVarSource{
+ SecretKeyRef: &corev1.SecretKeySelector{
+ LocalObjectReference:
corev1.LocalObjectReference{Name: "test"}, Key: "POSTGRESQL_PASSWORD",
+ },
+ },
+ },
+ {
+ Name: "QUARKUS_DATASOURCE_DB_KIND",
+ Value: "postgresql",
+ },
+ {
+ Name: "QUARKUS_DATASOURCE_JDBC_URL",
+ Value:
"jdbc:postgresql://host:port/database?currentSchema=workflow",
+ },
+ }
+ assert.Len(t, deployment.Spec.Template.Spec.Containers, 2)
+ assert.Equal(t, "superuser",
deployment.Spec.Template.Spec.ServiceAccountName)
+ assert.Len(t, deployment.Spec.Template.Spec.Volumes, 1)
+ flowContainer, _ :=
kubeutil.GetContainerByName(v1alpha08.DefaultContainerName,
&deployment.Spec.Template.Spec)
+ assert.Equal(t, "quay.io/example/my-workflow:1.0.0",
flowContainer.Image)
+ assert.Equal(t, int32(8080), flowContainer.Ports[0].ContainerPort)
+ assert.Equal(t, expectedEnvVars, flowContainer.Env)
+ assert.Len(t, flowContainer.VolumeMounts, 1)
+}
+
+var (
+ postgreSQLPort = 5432
+)
+
+func TestMergePodSpec_OverrideContainers_WithPostgreSQL_and_ServiceRef(t
*testing.T) {
+ workflow := test.GetBaseSonataFlow(t.Name())
+ workflow.Spec = v1alpha08.SonataFlowSpec{
+ PodTemplate: v1alpha08.PodTemplateSpec{
+ PodSpec: v1alpha08.PodSpec{
+ // Try to override the workflow container via
the podspec
+ Containers: []corev1.Container{
+ {
+ Name:
v1alpha08.DefaultContainerName,
+ Image:
"quay.io/example/my-workflow:1.0.0",
+ Ports: []corev1.ContainerPort{
+ {Name:
utils.HttpScheme, ContainerPort: 9090},
+ },
+ Env: []corev1.EnvVar{
+ {Name: "ENV1", Value:
"VALUE_CUSTOM"},
+ },
+ },
+ },
+ },
+ },
+ Persistence: &v1alpha08.PersistenceOptions{
+ PostgreSql: &v1alpha08.PersistencePostgreSql{
+ SecretRef:
v1alpha08.PostgreSqlSecretOptions{Name: "test"},
+ ServiceRef: &v1alpha08.PostgreSqlServiceOptions{
+ Name: "test",
+ Namespace: "foo",
+ Port: &postgreSQLPort,
+ DatabaseName: "petstore",
+ DatabaseSchema: "bar"},
+ },
+ },
+ }
+
+ object, err := DeploymentCreator(workflow)
+ assert.NoError(t, err)
+
+ deployment := object.(*appsv1.Deployment)
+ expectedEnvVars := []corev1.EnvVar{
+ {
+ Name: "QUARKUS_DATASOURCE_USERNAME",
+ Value: "",
+ ValueFrom: &corev1.EnvVarSource{
+ SecretKeyRef: &corev1.SecretKeySelector{
+ LocalObjectReference:
corev1.LocalObjectReference{Name: "test"}, Key: "POSTGRESQL_USER",
+ },
+ },
+ },
+ {
+ Name: "QUARKUS_DATASOURCE_PASSWORD",
+ Value: "",
+ ValueFrom: &corev1.EnvVarSource{
+ SecretKeyRef: &corev1.SecretKeySelector{
+ LocalObjectReference:
corev1.LocalObjectReference{Name: "test"}, Key: "POSTGRESQL_PASSWORD",
+ },
+ },
+ },
+ {
+ Name: "QUARKUS_DATASOURCE_DB_KIND",
+ Value: "postgresql",
+ },
+ {
+ Name: "QUARKUS_DATASOURCE_JDBC_URL",
+ Value:
"jdbc:postgresql://test.foo:5432/petstore?currentSchema=bar",
+ },
+ }
+ assert.Len(t, deployment.Spec.Template.Spec.Containers, 1)
+ flowContainer, _ :=
kubeutil.GetContainerByName(v1alpha08.DefaultContainerName,
&deployment.Spec.Template.Spec)
+ assert.Empty(t, flowContainer.Image)
+ assert.Equal(t, int32(8080), flowContainer.Ports[0].ContainerPort)
+ assert.Equal(t, expectedEnvVars, flowContainer.Env)
+}
diff --git a/controllers/profiles/common/persistence/postgreSQL.go
b/controllers/profiles/common/persistence/postgreSQL.go
new file mode 100644
index 00000000..ebb695af
--- /dev/null
+++ b/controllers/profiles/common/persistence/postgreSQL.go
@@ -0,0 +1,94 @@
+// Copyright 2023 Apache Software Foundation (ASF)
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package persistence
+
+import (
+ "strconv"
+
+ corev1 "k8s.io/api/core/v1"
+
+ operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants"
+)
+
+const (
+ defaultSchemaName = "default"
+ defaultDatabaseName = "sonataflow"
+
+ quarkusDatasourceJDBCURL string = "QUARKUS_DATASOURCE_JDBC_URL"
+ quarkusDatasourceDBKind string = "QUARKUS_DATASOURCE_DB_KIND"
+ quarkusDatasourceUsername string = "QUARKUS_DATASOURCE_USERNAME"
+ quarkusDatasourcePassword string = "QUARKUS_DATASOURCE_PASSWORD"
+)
+
+func ConfigurePostgreSqlEnv(postgresql *operatorapi.PersistencePostgreSql,
databaseSchema, databaseNamespace string) []corev1.EnvVar {
+ dataSourcePort := constants.DefaultPostgreSQLPort
+ databaseName := defaultDatabaseName
+ dataSourceURL := postgresql.JdbcUrl
+ if postgresql.ServiceRef != nil {
+ if len(postgresql.ServiceRef.DatabaseSchema) > 0 {
+ databaseSchema = postgresql.ServiceRef.DatabaseSchema
+ }
+ if len(postgresql.ServiceRef.Namespace) > 0 {
+ databaseNamespace = postgresql.ServiceRef.Namespace
+ }
+ if postgresql.ServiceRef.Port != nil {
+ dataSourcePort = *postgresql.ServiceRef.Port
+ }
+ if len(postgresql.ServiceRef.DatabaseName) > 0 {
+ databaseName = postgresql.ServiceRef.DatabaseName
+ }
+ dataSourceURL = "jdbc:" + constants.PersistenceTypePostgreSQL +
"://" + postgresql.ServiceRef.Name + "." + databaseNamespace + ":" +
strconv.Itoa(dataSourcePort) + "/" + databaseName + "?currentSchema=" +
databaseSchema
+ }
+ secretRef := corev1.LocalObjectReference{
+ Name: postgresql.SecretRef.Name,
+ }
+ postgresUsername := "POSTGRESQL_USER"
+ if len(postgresql.SecretRef.UserKey) > 0 {
+ postgresUsername = postgresql.SecretRef.UserKey
+ }
+ postgresPassword := "POSTGRESQL_PASSWORD"
+ if len(postgresql.SecretRef.PasswordKey) > 0 {
+ postgresPassword = postgresql.SecretRef.PasswordKey
+ }
+ return []corev1.EnvVar{
+ {
+ Name: quarkusDatasourceUsername,
+ ValueFrom: &corev1.EnvVarSource{
+ SecretKeyRef: &corev1.SecretKeySelector{
+ Key: postgresUsername,
+ LocalObjectReference: secretRef,
+ },
+ },
+ },
+ {
+ Name: quarkusDatasourcePassword,
+ ValueFrom: &corev1.EnvVarSource{
+ SecretKeyRef: &corev1.SecretKeySelector{
+ Key: postgresPassword,
+ LocalObjectReference: secretRef,
+ },
+ },
+ },
+ {
+ Name: quarkusDatasourceDBKind,
+ Value: constants.PersistenceTypePostgreSQL,
+ },
+ {
+ Name: quarkusDatasourceJDBCURL,
+ Value: dataSourceURL,
+ },
+ }
+}
diff --git a/operator.yaml b/operator.yaml
index 044f6ffd..973b5cac 100644
--- a/operator.yaml
+++ b/operator.yaml
@@ -18698,6 +18698,65 @@ spec:
required:
- states
type: object
+ persistence:
+ description: Persists service to a datasource of choice.
Ephemeral
+ by default.
+ maxProperties: 1
+ properties:
+ postgresql:
+ description: Connect configured services to a postgresql
database.
+ maxProperties: 2
+ minProperties: 2
+ properties:
+ jdbcUrl:
+ description: PostgreSql JDBC URL. Mutually exclusive
to serviceRef.
+ e.g.
"jdbc:postgresql://host:port/database?currentSchema=data-index-service"
+ type: string
+ secretRef:
+ description: Secret reference to the database user
credentials
+ properties:
+ name:
+ description: Name of the postgresql credentials
secret.
+ type: string
+ passwordKey:
+ description: Defaults to POSTGRESQL_PASSWORD
+ type: string
+ userKey:
+ description: Defaults to POSTGRESQL_USER
+ type: string
+ required:
+ - name
+ type: object
+ serviceRef:
+ description: Service reference to postgresql
datasource. Mutually
+ exclusive to jdbcUrl.
+ properties:
+ databaseName:
+ description: Name of postgresql database to be
used. Defaults
+ to "sonataflow"
+ type: string
+ databaseSchema:
+ description: Schema of postgresql database to be
used.
+ Defaults to "data-index-service"
+ type: string
+ name:
+ description: Name of the postgresql k8s service.
+ type: string
+ namespace:
+ description: Namespace of the postgresql k8s
service.
+ Defaults to the SonataFlowPlatform's local
namespace.
+ type: string
+ port:
+ description: Port to use when connecting to the
postgresql
+ k8s service. Defaults to 5432.
+ type: integer
+ required:
+ - name
+ type: object
+ required:
+ - secretRef
+ type: object
+ type: object
podTemplate:
description: PodTemplate describes the deployment details of
this
SonataFlow instance.
diff --git a/test/e2e/helpers.go b/test/e2e/helpers.go
index 75b03997..28e747df 100644
--- a/test/e2e/helpers.go
+++ b/test/e2e/helpers.go
@@ -35,7 +35,16 @@ import (
. "github.com/onsi/gomega"
)
+// sonataflow_operator_namespace store the ns where the Operator and Operand
will be executed
+const sonataflow_operator_namespace = "sonataflow-operator-system"
+
type health struct {
+ Status string `json:"status"`
+ Checks []check `json:"checks"`
+}
+
+type check struct {
+ Name string `json:"name"`
Status string `json:"status"`
}
@@ -43,6 +52,24 @@ var (
upStatus string = "UP"
)
+func getHealthFromPod(name, namespace string) (*health, error) {
+ // iterate over all containers to find the one that responds to the
HTTP health endpoint
+ Expect(name).NotTo(BeEmpty(), "pod name is empty")
+ cmd := exec.Command("kubectl", "get", "pod", name, "-n", namespace,
"-o", `jsonpath={.spec.containers[*].name}`)
+ output, err := utils.Run(cmd)
+ Expect(err).NotTo(HaveOccurred())
+ var errs error
+ for _, cname := range strings.Split(string(output), " ") {
+ var h *health
+ h, err = getHealthStatusInContainer(name, cname, namespace)
+ if err == nil {
+ return h, nil
+ }
+ errs = fmt.Errorf("%v; %w", err, errs)
+ }
+ return nil, errs
+}
+
func verifyHealthStatusInPod(name string, namespace string) {
// iterate over all containers to find the one that responds to the
HTTP health endpoint
Expect(name).NotTo(BeEmpty(), "pod name is empty")
diff --git a/test/e2e/platform_test.go b/test/e2e/platform_test.go
index e9b8edf1..24115567 100644
--- a/test/e2e/platform_test.go
+++ b/test/e2e/platform_test.go
@@ -27,6 +27,9 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/test"
"github.com/apache/incubator-kie-kogito-serverless-operator/test/utils"
+
+ //nolint:golint
+ //nolint:revive
. "github.com/onsi/ginkgo/v2"
//nolint:golint
diff --git a/test/e2e/workflow_test.go b/test/e2e/workflow_test.go
index b84282e3..e05c843f 100644
--- a/test/e2e/workflow_test.go
+++ b/test/e2e/workflow_test.go
@@ -20,10 +20,12 @@
package e2e
import (
+ "bytes"
"fmt"
"math/rand"
"os/exec"
"path/filepath"
+ "strings"
"time"
"github.com/apache/incubator-kie-kogito-serverless-operator/test"
@@ -38,7 +40,7 @@ import (
. "github.com/onsi/gomega"
)
-var _ = Describe("SonataFlow Operator", func() {
+var _ = Describe("SonataFlow Operator", Ordered, func() {
var targetNamespace string
BeforeEach(func() {
@@ -135,6 +137,77 @@ var _ = Describe("SonataFlow Operator", func() {
return err
}, 2*time.Minute, time.Second).Should(Succeed())
})
+
})
})
+
+var _ = Describe("Validate the persistence ", Ordered, func() {
+
+ var (
+ ns string
+ )
+
+ BeforeEach(func() {
+ ns = fmt.Sprintf("test-%d", rand.Intn(1024)+1)
+ cmd := exec.Command("kubectl", "create", "namespace", ns)
+ _, err := utils.Run(cmd)
+ Expect(err).NotTo(HaveOccurred())
+ })
+ AfterEach(func() {
+ // Remove platform CR if it exists
+ if len(ns) > 0 {
+ cmd := exec.Command("kubectl", "delete", "namespace",
ns, "--wait")
+ _, err := utils.Run(cmd)
+ Expect(err).NotTo(HaveOccurred())
+ }
+
+ })
+
+ DescribeTable("when deploying a SonataFlow CR with PostgreSQL
persistence", func(testcaseDir string) {
+ By("Deploy the CR")
+ var manifests []byte
+ EventuallyWithOffset(1, func() error {
+ var err error
+ cmd := exec.Command("kubectl", "kustomize", testcaseDir)
+ manifests, err = utils.Run(cmd)
+ return err
+ }, time.Minute, time.Second).Should(Succeed())
+ cmd := exec.Command("kubectl", "create", "-n", ns, "-f", "-")
+ cmd.Stdin = bytes.NewBuffer(manifests)
+ _, err := utils.Run(cmd)
+ Expect(err).NotTo(HaveOccurred())
+ By("Wait for SonatatFlow CR to complete deployment")
+ // wait for service deployments to be ready
+ EventuallyWithOffset(1, func() error {
+ cmd = exec.Command("kubectl", "wait", "pod", "-n", ns,
"-l", "sonataflow.org/workflow-app=callbackstatetimeouts", "--for",
"condition=Ready", "--timeout=5s")
+ out, err := utils.Run(cmd)
+ GinkgoWriter.Printf("%s\n", string(out))
+ return err
+ }, 10*time.Minute, 5).Should(Succeed())
+
+ By("Evaluate status of the workflow's pod database connection
health endpoint")
+ cmd = exec.Command("kubectl", "get", "pod", "-l",
"sonataflow.org/workflow-app=callbackstatetimeouts", "-n", ns,
"-ojsonpath={.items[*].metadata.name}")
+ output, err := utils.Run(cmd)
+ Expect(err).NotTo(HaveOccurred())
+ EventuallyWithOffset(1, func() bool {
+ for _, pn := range strings.Split(string(output), " ") {
+ h, err := getHealthFromPod(pn, ns)
+ if err != nil {
+ continue
+ }
+ Expect(h.Status).To(Equal(upStatus), "Pod
health is not UP")
+ for _, c := range h.Checks {
+ if c.Name == "Database connections
health check" {
+
Expect(c.Status).To(Equal(upStatus), "Pod's database connection is not UP")
+ return true
+ }
+ }
+ }
+ return false
+ }, 10*time.Minute).Should(BeTrue())
+ },
+ Entry("defined in the workflow from an existing kubernetes
service as a reference",
test.GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory("by_service")),
+ )
+
+})
diff --git a/test/testdata/persistence/workflow/by_service/01-postgres.yaml
b/test/testdata/persistence/workflow/by_service/01-postgres.yaml
new file mode 100644
index 00000000..662de4c7
--- /dev/null
+++ b/test/testdata/persistence/workflow/by_service/01-postgres.yaml
@@ -0,0 +1,86 @@
+# Copyright 2024 Apache Software Foundation (ASF)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+---
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+ labels:
+ app.kubernetes.io/name: postgres
+ name: postgres-pvc
+spec:
+ accessModes:
+ - ReadWriteOnce
+ resources:
+ requests:
+ storage: 1Gi
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ labels:
+ app.kubernetes.io/name: postgres
+ name: postgres
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app.kubernetes.io/name: postgres
+ template:
+ metadata:
+ labels:
+ app.kubernetes.io/name: postgres
+ spec:
+ containers:
+ - name: postgres
+ image: postgres:13.2-alpine
+ imagePullPolicy: 'IfNotPresent'
+ ports:
+ - containerPort: 5432
+ volumeMounts:
+ - name: storage
+ mountPath: /var/lib/postgresql/data
+ envFrom:
+ - secretRef:
+ name: postgres-secrets
+ readinessProbe:
+ exec:
+ command: ["pg_isready"]
+ initialDelaySeconds: 15
+ timeoutSeconds: 2
+ livenessProbe:
+ exec:
+ command: ["pg_isready"]
+ initialDelaySeconds: 15
+ timeoutSeconds: 2
+ resources:
+ limits:
+ memory: "256Mi"
+ cpu: "500m"
+ volumes:
+ - name: storage
+ persistentVolumeClaim:
+ claimName: postgres-pvc
+---
+apiVersion: v1
+kind: Service
+metadata:
+ labels:
+ app.kubernetes.io/name: postgres
+ name: postgres
+spec:
+ selector:
+ app.kubernetes.io/name: postgres
+ ports:
+ - port: 5432
diff --git
a/test/testdata/persistence/workflow/by_service/02-sonataflow_platform.yaml
b/test/testdata/persistence/workflow/by_service/02-sonataflow_platform.yaml
new file mode 100644
index 00000000..5867f2d6
--- /dev/null
+++ b/test/testdata/persistence/workflow/by_service/02-sonataflow_platform.yaml
@@ -0,0 +1,27 @@
+# Copyright 2024 Apache Software Foundation (ASF)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apiVersion: sonataflow.org/v1alpha08
+kind: SonataFlowPlatform
+metadata:
+ name: sonataflow-platform
+spec:
+ build:
+ template:
+ buildArgs:
+ - name: QUARKUS_EXTENSIONS
+ value:
org.kie.kogito:kogito-addons-quarkus-jobs-knative-eventing:999-SNAPSHOT,org.kie.kogito:kogito-addons-quarkus-persistence-jdbc:999-SNAPSHOT,io.quarkus:quarkus-jdbc-postgresql:3.2.9.Final,io.quarkus:quarkus-agroal:3.2.9.Final
+ config:
+ strategyOptions:
+ KanikoBuildCacheEnabled: "true"
diff --git
a/test/testdata/persistence/workflow/by_service/03-sonataflow_callbackstatetimeouts.sw.yaml
b/test/testdata/persistence/workflow/by_service/03-sonataflow_callbackstatetimeouts.sw.yaml
new file mode 100644
index 00000000..9d0fcbc7
--- /dev/null
+++
b/test/testdata/persistence/workflow/by_service/03-sonataflow_callbackstatetimeouts.sw.yaml
@@ -0,0 +1,102 @@
+# Copyright 2024 Apache Software Foundation (ASF)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apiVersion: sonataflow.org/v1alpha08
+kind: SonataFlow
+metadata:
+ name: callbackstatetimeouts
+ annotations:
+ sonataflow.org/description: Callback State Timeouts Example k8s
+ sonataflow.org/version: 0.0.1
+spec:
+ persistence:
+ postgresql:
+ secretRef:
+ name: postgres-secrets
+ userKey: POSTGRES_USER
+ passwordKey: POSTGRES_PASSWORD
+ serviceRef:
+ name: postgres
+ port: 5432
+ databaseName: sonataflow
+ databaseSchema: callbackstatetimeouts
+ podTemplate:
+ container:
+ env:
+ - name: QUARKUS_FLYWAY_MIGRATE_AT_START
+ value: "true"
+ initContainers:
+ - name: init-postgres
+ image: registry.access.redhat.com/ubi9/ubi-micro:latest
+ imagePullPolicy: IfNotPresent
+ #TODO: https://issues.redhat.com/browse/KOGITO-9840
+ command: [ 'sh', '-c', 'until (echo 1 > /dev/tcp/postgres.$(cat
/var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local/5432)
>/dev/null 2>&1; do echo "Waiting for postgres server"; sleep 3; done;' ]
+ flow:
+ start: PrintStartMessage
+ events:
+ - name: callbackEvent
+ source: ''
+ type: callback_event_type
+ functions:
+ - name: systemOut
+ type: custom
+ operation: sysout
+ states:
+ - name: PrintStartMessage
+ type: operation
+ actions:
+ - name: printSystemOut
+ functionRef:
+ refName: systemOut
+ arguments:
+ message: "${\"callback-state-timeouts: \" +
$WORKFLOW.instanceId + \" has started.\"}"
+ transition: CallbackState
+ - name: CallbackState
+ type: callback
+ action:
+ name: callbackAction
+ functionRef:
+ refName: systemOut
+ arguments:
+ message: "${\"callback-state-timeouts: \" + $WORKFLOW.instanceId
+ \" has executed the callbackFunction.\"}"
+ eventRef: callbackEvent
+ transition: CheckEventArrival
+ timeouts:
+ eventTimeout: PT30S
+ - name: CheckEventArrival
+ type: switch
+ dataConditions:
+ - condition: "${ .eventData != null }"
+ transition: EventArrived
+ defaultCondition:
+ transition: EventNotArrived
+ - name: EventArrived
+ type: inject
+ data:
+ exitMessage: "The callback event has arrived."
+ transition: PrintExitMessage
+ - name: EventNotArrived
+ type: inject
+ data:
+ exitMessage: "The callback event has not arrived, and the timeout
has overdue."
+ transition: PrintExitMessage
+ - name: PrintExitMessage
+ type: operation
+ actions:
+ - name: printSystemOut
+ functionRef:
+ refName: systemOut
+ arguments:
+ message: "${\"callback-state-timeouts: \" +
$WORKFLOW.instanceId + \" has finalized. \" + .exitMessage + \" eventData: \" +
.eventData}"
+ end: true
diff --git a/test/testdata/persistence/workflow/by_service/kustomization.yaml
b/test/testdata/persistence/workflow/by_service/kustomization.yaml
new file mode 100644
index 00000000..b7f587bc
--- /dev/null
+++ b/test/testdata/persistence/workflow/by_service/kustomization.yaml
@@ -0,0 +1,33 @@
+# Copyright 2024 Apache Software Foundation (ASF)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+resources:
+- 01-postgres.yaml
+- 02-sonataflow_platform.yaml
+- 03-sonataflow_callbackstatetimeouts.sw.yaml
+
+generatorOptions:
+ disableNameSuffixHash: true
+
+secretGenerator:
+ - name: postgres-secrets
+ literals:
+ - POSTGRES_USER=sonataflow
+ - POSTGRES_PASSWORD=sonataflow
+ - POSTGRES_DATABASE=sonataflow
+ - PGDATA=/var/lib/pgsql/data/userdata
+
+sortOptions:
+ order: fifo
+
diff --git a/test/yaml.go b/test/yaml.go
index 5edbc605..37d28220 100644
--- a/test/yaml.go
+++ b/test/yaml.go
@@ -160,26 +160,37 @@ func GetSonataFlowBuilderConfig(namespace string)
*corev1.ConfigMap {
return cm
}
-func GetBaseSonataFlow(namespace string) *operatorapi.SonataFlow {
- return GetSonataFlow(sonataFlowSampleYamlCR, namespace)
+func NewSonataFlow(filePath string, namespace string, options
...func(*operatorapi.SonataFlow)) *operatorapi.SonataFlow {
+ sf := GetSonataFlow(filePath, namespace)
+ for _, f := range options {
+ f(sf)
+ }
+ return sf
}
-func GetBaseSonataFlowWithDevProfile(namespace string) *operatorapi.SonataFlow
{
- workflow := GetBaseSonataFlow(namespace)
+func SetDevProfile(workflow *operatorapi.SonataFlow) {
workflow.Annotations["sonataflow.org/profile"] = "dev"
- return workflow
}
-func GetBaseSonataFlowWithProdProfile(namespace string)
*operatorapi.SonataFlow {
- workflow := GetBaseSonataFlow(namespace)
+func SetProdProfile(workflow *operatorapi.SonataFlow) {
workflow.Annotations["sonataflow.org/profile"] = "prod"
- return workflow
+}
+
+func GetBaseSonataFlow(namespace string, options
...*func(*operatorapi.SonataFlow)) *operatorapi.SonataFlow {
+ return NewSonataFlow(sonataFlowSampleYamlCR, namespace)
+}
+
+func GetBaseSonataFlowWithDevProfile(namespace string) *operatorapi.SonataFlow
{
+ return NewSonataFlow(sonataFlowSampleYamlCR, namespace, SetDevProfile)
+}
+
+func GetBaseSonataFlowWithProdProfile(namespace string)
*operatorapi.SonataFlow {
+ return NewSonataFlow(sonataFlowSampleYamlCR, namespace, SetProdProfile)
}
// GetBaseSonataFlowWithProdOpsProfile gets a base workflow that has a
pre-built image set in podTemplate.
func GetBaseSonataFlowWithProdOpsProfile(namespace string)
*operatorapi.SonataFlow {
- workflow := GetSonataFlow(SonataFlowSimpleOpsYamlCR, namespace)
- return workflow
+ return NewSonataFlow(SonataFlowSimpleOpsYamlCR, namespace)
}
func GetBasePlatformInReadyPhase(namespace string)
*operatorapi.SonataFlowPlatform {
@@ -221,6 +232,9 @@ func GetSonataFlowE2eOrderProcessingFolder() string {
func GetSonataFlowE2EPlatformServicesDirectory() string {
return filepath.Join(getTestDataDir(), "platform", "services")
}
+func GetSonataFlowE2EWorkflowPersistenceSampleDataDirectory(subdir string)
string {
+ return filepath.Join(getTestDataDir(), "persistence", "workflow",
subdir)
+}
// getTestDataDir gets the testdata directory containing every sample out
there from test/testdata.
// It should be used for every testing unit within the module.
diff --git a/workflowproj/workflowproj_test.go
b/workflowproj/workflowproj_test.go
index bb0daa8a..0bc9679c 100644
--- a/workflowproj/workflowproj_test.go
+++ b/workflowproj/workflowproj_test.go
@@ -20,6 +20,7 @@
package workflowproj
import (
+ "fmt"
"io"
"os"
"path"
@@ -111,6 +112,8 @@ func
Test_Handler_WorkflowMinimalAndPropsAndSpecAndGeneric(t *testing.T) {
assert.Equal(t, "02-hello-resources", proj.Resources[1].Name)
assert.Equal(t,
proj.Workflow.Spec.Resources.ConfigMaps[0].ConfigMap.Name,
proj.Resources[0].Name)
assert.Equal(t,
proj.Workflow.Spec.Resources.ConfigMaps[1].ConfigMap.Name,
proj.Resources[1].Name)
+ assert.NotEmpty(t, proj.Resources[0].Data, fmt.Sprintf("Data in
proj.Resources[0] is empty %+v", proj.Resources[0]))
+ assert.NotEmpty(t, proj.Resources[1].Data, fmt.Sprintf("Data in
proj.Resources[1] is empty %+v", proj.Resources[1]))
assert.NotEmpty(t, proj.Resources[0].Data["myopenapi.json"])
assert.NotEmpty(t, proj.Resources[1].Data["input.json"])
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]