wmedvede commented on code in PR #2790:
URL:
https://github.com/apache/incubator-kie-tools/pull/2790#discussion_r1918428174
##########
packages/sonataflow-operator/internal/controller/platform/k8s.go:
##########
@@ -62,20 +63,86 @@ func (action *serviceAction) CanHandle(platform
*operatorapi.SonataFlowPlatform)
return platform.Status.IsReady()
}
+func (action *serviceAction) createOrUpdateDBMigrationJob(ctx context.Context,
client client.Client, platform *operatorapi.SonataFlowPlatform, pshDI
services.PlatformServiceHandler, pshJS services.PlatformServiceHandler)
(*DBMigratorJob, error) {
+ dbMigratorJob := NewDBMigratorJobData(ctx, action.client, platform,
pshDI, pshJS)
+
+ // Invoke DB Migration only if both or either DI/JS services are
requested, in addition to DBMigrationStrategyJob
+ if dbMigratorJob != nil {
+ klog.V(log.I).InfoS("Starting DB Migration Job: ")
+
+ job := dbMigratorJob.CreateJobDBMigration(platform)
+ if op, err := controllerutil.CreateOrUpdate(ctx, client, job,
func() error {
+ return nil
+ }); err != nil {
+ return dbMigratorJob, err
+ } else {
+ klog.V(log.I).InfoS("DB Migration Job successfully
created on cluster", "operation", op)
+ }
+ }
+ return dbMigratorJob, nil
+}
+
+func (action *serviceAction) handleDBMigrationJob(ctx context.Context,
platform *operatorapi.SonataFlowPlatform, psDI services.PlatformServiceHandler,
psJS services.PlatformServiceHandler) (*operatorapi.SonataFlowPlatform, error) {
Review Comment:
same for this method, I don't believe it belongs to the serviceAction struct
##########
packages/sonataflow-operator/internal/controller/platform/db_migrator_job.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package platform
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "strconv"
+ "strings"
+
+ batchv1 "k8s.io/api/batch/v1"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/klog/v2"
+ "k8s.io/utils/pointer"
+
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/version"
+
+ operatorapi
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/container-builder/client"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/cfg"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform/services"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants"
+ "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
+)
+
+type QuarkusDataSource struct {
+ JdbcUrl string
+ Username string
+ Password string
+ Schema string
+}
+
+type DBMigratorJob struct {
+ MigrateDBDataIndex bool
+ DataIndexDataSource *QuarkusDataSource
+ MigrateDBJobsService bool
+ JobsServiceDataSource *QuarkusDataSource
+}
+
+const (
+ dbMigrationJobName = "sonataflow-db-migrator-job"
+ dbMigrationContainerName = "db-migration-container"
+ dbMigratorToolImage =
"quay.io/rhkp/incubator-kie-kogito-service-db-migration-postgresql:latest"
+ dbMigrationCmd = "./migration.sh"
+ dbMigrationJobFailed = 1
+ dbMigrationJobSucceeded = 1
+
+ migrateDBDataIndex = "MIGRATE_DB_DATAINDEX"
+ quarkusDataSourceDataIndexJdbcURL =
"QUARKUS_DATASOURCE_DATAINDEX_JDBC_URL"
+ quarkusDataSourceDataIndexUserName =
"QUARKUS_DATASOURCE_DATAINDEX_USERNAME"
+ quarkusDataSourceDataIndexPassword =
"QUARKUS_DATASOURCE_DATAINDEX_PASSWORD"
+ quarkusFlywayDataIndexSchemas = "QUARKUS_FLYWAY_DATAINDEX_SCHEMAS"
+
+ migrateDBJobsService = "MIGRATE_DB_JOBSSERVICE"
+ quarkusDataSourceJobsServiceJdbcURL =
"QUARKUS_DATASOURCE_JOBSSERVICE_JDBC_URL"
+ quarkusDataSourceJobsServiceUserName =
"QUARKUS_DATASOURCE_JOBSSERVICE_USERNAME"
+ quarkusDataSourceJobsServicePassword =
"QUARKUS_DATASOURCE_JOBSSERVICE_PASSWORD"
+ quarkusFlywayJobsServiceSchemas =
"QUARKUS_FLYWAY_JOBSSERVICE_SCHEMAS"
+)
+
+type DBMigrationJobCfg struct {
+ JobName string
+ ContainerName string
+ ToolImageName string
+ MigrationCmd string
+}
+
+func getDBSchemaName(persistencePostgreSQL *operatorapi.PersistencePostgreSQL,
defaultSchemaName string) string {
+ if persistencePostgreSQL != nil && persistencePostgreSQL.ServiceRef !=
nil && len(persistencePostgreSQL.ServiceRef.DatabaseSchema) > 0 {
+ return persistencePostgreSQL.ServiceRef.DatabaseSchema
+ }
+
+ if persistencePostgreSQL != nil && len(persistencePostgreSQL.JdbcUrl) >
0 {
+ jdbcURL := persistencePostgreSQL.JdbcUrl
+ _, a, found := strings.Cut(jdbcURL, "currentSchema=")
+
+ if found {
+ if strings.Contains(a, "&") {
+ b, _, found := strings.Cut(a, "&")
+ if found {
+ return b
+ }
+ } else {
+ return a
+ }
+ }
+ }
+ return defaultSchemaName
+}
+
+func getJdbcUrl(postgresql *operatorapi.PersistencePostgreSQL,
defaultDBSchemaName string) string {
+ databaseSchema := defaultDBSchemaName
+ dataSourcePort := constants.DefaultPostgreSQLPort
+ databaseName := constants.DefaultDatabaseName
+ postgresServiceName := constants.DefaultPostgresServiceName
+
+ dataSourceURL := ""
+
+ if postgresql.ServiceRef != nil {
+ if len(postgresql.ServiceRef.DatabaseSchema) > 0 {
+ databaseSchema = postgresql.ServiceRef.DatabaseSchema
+ }
+ if postgresql.ServiceRef.Port != nil {
+ dataSourcePort = *postgresql.ServiceRef.Port
+ }
+ if len(postgresql.ServiceRef.DatabaseName) > 0 {
+ databaseName = postgresql.ServiceRef.DatabaseName
+ }
+ dataSourceURL =
fmt.Sprintf("jdbc:postgresql://%s:%d/%s?currentSchema=%s",
postgresql.ServiceRef.Name, dataSourcePort, databaseName, databaseSchema)
+ } else if len(postgresql.JdbcUrl) > 0 {
+ dataSourceURL = postgresql.JdbcUrl
+ } else {
+ dataSourceURL =
fmt.Sprintf("jdbc:postgresql://%s:%d/%s?currentSchema=%s", postgresServiceName,
dataSourcePort, databaseName, databaseSchema)
+ }
+
+ return dataSourceURL
+}
+
+func getQuarkusDataSourceFromPersistence(ctx context.Context, platform
*operatorapi.SonataFlowPlatform, persistence
*operatorapi.PersistenceOptionsSpec, defaultSchemaName string)
*QuarkusDataSource {
+ if persistence != nil && persistence.PostgreSQL != nil {
+ quarkusDataSource := &QuarkusDataSource{}
+ quarkusDataSource.JdbcUrl = getJdbcUrl(persistence.PostgreSQL,
defaultSchemaName)
+ quarkusDataSource.Username, _ =
services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name,
persistence.PostgreSQL.SecretRef.UserKey, platform.Namespace)
+ quarkusDataSource.Password, _ =
services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name,
persistence.PostgreSQL.SecretRef.PasswordKey, platform.Namespace)
+ quarkusDataSource.Schema =
getDBSchemaName(persistence.PostgreSQL, defaultSchemaName)
+ return quarkusDataSource
+ }
+
+ return nil
+}
+
+func NewDBMigratorJobData(ctx context.Context, client client.Client, platform
*operatorapi.SonataFlowPlatform, pshDI services.PlatformServiceHandler, pshJS
services.PlatformServiceHandler) *DBMigratorJob {
+
+ diJobsBasedDBMigration := false
+ jsJobsBasedDBMigration := false
+
+ if pshDI.IsPersistenceEnabledtInSpec() {
+ diJobsBasedDBMigration =
services.IsJobsBasedDBMigration(platform.Spec.Services.DataIndex.Persistence)
+ }
+ if pshJS.IsPersistenceEnabledtInSpec() {
+ jsJobsBasedDBMigration =
services.IsJobsBasedDBMigration(platform.Spec.Services.JobService.Persistence)
+ }
+
+ if (pshDI.IsServiceSetInSpec() && diJobsBasedDBMigration) ||
(pshJS.IsServiceSetInSpec() && jsJobsBasedDBMigration) {
+ quarkusDataSourceDataIndex := &QuarkusDataSource{}
+ quarkusDataSourceJobService := &QuarkusDataSource{}
+
+ if diJobsBasedDBMigration {
+ quarkusDataSourceDataIndex =
getQuarkusDataSourceFromPersistence(ctx, platform,
platform.Spec.Services.DataIndex.Persistence, "data-index-service")
+ }
+
+ if jsJobsBasedDBMigration {
+ quarkusDataSourceJobService =
getQuarkusDataSourceFromPersistence(ctx, platform,
platform.Spec.Services.JobService.Persistence, "jobs-service")
+ }
+
+ return &DBMigratorJob{
+ MigrateDBDataIndex: diJobsBasedDBMigration,
+ DataIndexDataSource: quarkusDataSourceDataIndex,
+ MigrateDBJobsService: jsJobsBasedDBMigration,
+ JobsServiceDataSource: quarkusDataSourceJobService,
+ }
+ }
+ return nil
+}
+
+func newQuarkusDataSource(jdbcURL string, userName string, password string,
schema string) *QuarkusDataSource {
+ return &QuarkusDataSource{
+ JdbcUrl: jdbcURL,
+ Username: userName,
+ Password: password,
+ Schema: schema,
+ }
+}
+
+func (dbmj DBMigratorJob) CreateJobDBMigration(platform
*operatorapi.SonataFlowPlatform) *batchv1.Job {
+
+ diQuarkusDataSource := newQuarkusDataSource("", "", "", "")
+ jsQuarkusDataSource := newQuarkusDataSource("", "", "", "")
+
+ if dbmj.DataIndexDataSource != nil {
+ diQuarkusDataSource.JdbcUrl = dbmj.DataIndexDataSource.JdbcUrl
+ diQuarkusDataSource.Username = dbmj.DataIndexDataSource.Username
+ diQuarkusDataSource.Password = dbmj.DataIndexDataSource.Password
+ diQuarkusDataSource.Schema = dbmj.DataIndexDataSource.Schema
+ }
+
+ if dbmj.JobsServiceDataSource != nil {
+ jsQuarkusDataSource.JdbcUrl = dbmj.JobsServiceDataSource.JdbcUrl
+ jsQuarkusDataSource.Username =
dbmj.JobsServiceDataSource.Username
+ jsQuarkusDataSource.Password =
dbmj.JobsServiceDataSource.Password
+ jsQuarkusDataSource.Schema = dbmj.JobsServiceDataSource.Schema
+ }
+
+ dbMigrationJobCfg := newDBMigrationJobCfg()
+ job := &batchv1.Job{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: dbMigrationJobCfg.JobName,
+ Namespace: platform.Namespace,
+ },
+ Spec: batchv1.JobSpec{
+ Template: corev1.PodTemplateSpec{
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{
+ {
+ Name:
dbMigrationJobCfg.ContainerName,
+ Image:
dbMigrationJobCfg.ToolImageName,
+ Env: []corev1.EnvVar{
+ {
+ Name:
migrateDBDataIndex,
+ Value:
strconv.FormatBool(dbmj.MigrateDBDataIndex),
+ },
+ {
+ Name:
quarkusDataSourceDataIndexJdbcURL,
+ Value:
diQuarkusDataSource.JdbcUrl,
+ },
+ {
+ Name:
quarkusDataSourceDataIndexUserName,
+ Value:
diQuarkusDataSource.Username,
+ },
+ {
+ Name:
quarkusDataSourceDataIndexPassword,
+ Value:
diQuarkusDataSource.Password,
+ },
+ {
+ Name:
quarkusFlywayDataIndexSchemas,
+ Value:
diQuarkusDataSource.Schema,
+ },
+ {
+ Name:
migrateDBJobsService,
+ Value:
strconv.FormatBool(dbmj.MigrateDBJobsService),
+ },
+ {
+ Name:
quarkusDataSourceJobsServiceJdbcURL,
+ Value:
jsQuarkusDataSource.JdbcUrl,
+ },
+ {
+ Name:
quarkusDataSourceJobsServiceUserName,
+ Value:
jsQuarkusDataSource.Username,
+ },
+ {
+ Name:
quarkusDataSourceJobsServicePassword,
+ Value:
jsQuarkusDataSource.Password,
+ },
+ {
+ Name:
quarkusFlywayJobsServiceSchemas,
+ Value:
jsQuarkusDataSource.Schema,
+ },
+ },
+ Command: []string{
+
dbMigrationJobCfg.MigrationCmd,
+ },
+ },
+ },
+ RestartPolicy: "Never",
+ },
+ },
+ BackoffLimit: pointer.Int32(0),
+ },
+ }
+ return job
+}
+
+// GetDBMigrationJobStatus Returns db migration job status
+func (dbmj DBMigratorJob) GetDBMigrationJobStatus(ctx context.Context, client
client.Client, platform *operatorapi.SonataFlowPlatform) (*batchv1.JobStatus,
error) {
+ job, err := client.BatchV1().Jobs(platform.Namespace).Get(ctx,
dbMigrationJobName, metav1.GetOptions{})
+ if err != nil {
+ klog.V(log.E).InfoS("Error getting DB migrator job while
monitoring completion: ", "error", err)
+ return nil, err
+ }
+ return &job.Status, nil
+}
+
+// NewSonataFlowPlatformDBMigrationPhase Returns a new DB migration phase for
SonataFlowPlatform
+func NewSonataFlowPlatformDBMigrationPhase(status
operatorapi.DBMigrationStatus, message string, reason string)
*operatorapi.SonataFlowPlatformDBMigrationPhase {
+ return &operatorapi.SonataFlowPlatformDBMigrationPhase{
+ Status: status,
+ Message: message,
+ Reason: reason,
+ }
+}
+
+// UpdateSonataFlowPlatformDBMigrationPhase Updates a given
SonataFlowPlatformDBMigrationPhase with the supplied values
+func UpdateSonataFlowPlatformDBMigrationPhase(dbMigrationStatus
*operatorapi.SonataFlowPlatformDBMigrationPhase, status
operatorapi.DBMigrationStatus, message string, reason string)
*operatorapi.SonataFlowPlatformDBMigrationPhase {
+ if dbMigrationStatus != nil {
+ dbMigrationStatus.Status = status
+ dbMigrationStatus.Message = message
+ dbMigrationStatus.Reason = reason
+ return dbMigrationStatus
+ }
+ return nil
+}
+
+func getKogitoDBMigratorToolImageName() string {
+
+ imgTag := cfg.GetCfg().KogitoDBMigratorToolImageTag
+
+ if imgTag == "" {
+ // returns
"docker.io/apache/incubator-kie-kogito-db-migrator-tool:<tag>"
+ imgTag = fmt.Sprintf("%s-%s:%s", constants.ImageNamePrefix,
constants.KogitoDBMigratorTool, version.GetImageTagVersion())
+ }
+ return imgTag
+}
+
+func newDBMigrationJobCfg() *DBMigrationJobCfg {
+ return &DBMigrationJobCfg{
+ JobName: dbMigrationJobName,
+ ContainerName: dbMigrationContainerName,
+ ToolImageName: getKogitoDBMigratorToolImageName(),
+ MigrationCmd: dbMigrationCmd,
+ }
+}
+
+// ReconcileDBMigrationJob Check the status of running DB migration job and
return status
+func (dbmj DBMigratorJob) ReconcileDBMigrationJob(ctx context.Context, client
client.Client, platform *operatorapi.SonataFlowPlatform) (*batchv1.JobStatus,
error) {
+ platform.Status.SonataFlowPlatformDBMigrationPhase =
NewSonataFlowPlatformDBMigrationPhase(operatorapi.DBMigrationStatusStarted,
operatorapi.MessageDBMigrationStatusStarted,
operatorapi.ReasonDBMigrationStatusStarted)
+
+ dbMigratorJobStatus, err := dbmj.GetDBMigrationJobStatus(ctx, client,
platform)
+ if err != nil {
+ return nil, err
+ }
+
+ klog.V(log.I).InfoS("Db migration job status: ", "active",
dbMigratorJobStatus.Active, "ready", dbMigratorJobStatus.Ready, "failed",
dbMigratorJobStatus.Failed, "success", dbMigratorJobStatus.Succeeded,
"CompletedIndexes", dbMigratorJobStatus.CompletedIndexes, "terminatedPods",
dbMigratorJobStatus.UncountedTerminatedPods)
+
+ if dbMigratorJobStatus.Failed == dbMigrationJobFailed {
+ platform.Status.SonataFlowPlatformDBMigrationPhase =
UpdateSonataFlowPlatformDBMigrationPhase(platform.Status.SonataFlowPlatformDBMigrationPhase,
operatorapi.DBMigrationStatusFailed,
operatorapi.MessageDBMigrationStatusFailed,
operatorapi.ReasonDBMigrationStatusFailed)
+ klog.V(log.I).InfoS("DB migration job failed")
+ return dbMigratorJobStatus, errors.New("DB migration job
failed")
Review Comment:
Information about the job namespace and name would be nice here for future
error tracing.
##########
packages/sonataflow-operator/internal/controller/platform/db_migrator_job.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package platform
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "strconv"
+ "strings"
+
+ batchv1 "k8s.io/api/batch/v1"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/klog/v2"
+ "k8s.io/utils/pointer"
+
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/version"
+
+ operatorapi
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/container-builder/client"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/cfg"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform/services"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants"
+ "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
+)
+
+type QuarkusDataSource struct {
+ JdbcUrl string
+ Username string
+ Password string
+ Schema string
+}
+
+type DBMigratorJob struct {
+ MigrateDBDataIndex bool
+ DataIndexDataSource *QuarkusDataSource
+ MigrateDBJobsService bool
+ JobsServiceDataSource *QuarkusDataSource
+}
+
+const (
+ dbMigrationJobName = "sonataflow-db-migrator-job"
+ dbMigrationContainerName = "db-migration-container"
+ dbMigratorToolImage =
"quay.io/rhkp/incubator-kie-kogito-service-db-migration-postgresql:latest"
+ dbMigrationCmd = "./migration.sh"
+ dbMigrationJobFailed = 1
+ dbMigrationJobSucceeded = 1
+
+ migrateDBDataIndex = "MIGRATE_DB_DATAINDEX"
+ quarkusDataSourceDataIndexJdbcURL =
"QUARKUS_DATASOURCE_DATAINDEX_JDBC_URL"
+ quarkusDataSourceDataIndexUserName =
"QUARKUS_DATASOURCE_DATAINDEX_USERNAME"
+ quarkusDataSourceDataIndexPassword =
"QUARKUS_DATASOURCE_DATAINDEX_PASSWORD"
+ quarkusFlywayDataIndexSchemas = "QUARKUS_FLYWAY_DATAINDEX_SCHEMAS"
+
+ migrateDBJobsService = "MIGRATE_DB_JOBSSERVICE"
+ quarkusDataSourceJobsServiceJdbcURL =
"QUARKUS_DATASOURCE_JOBSSERVICE_JDBC_URL"
+ quarkusDataSourceJobsServiceUserName =
"QUARKUS_DATASOURCE_JOBSSERVICE_USERNAME"
+ quarkusDataSourceJobsServicePassword =
"QUARKUS_DATASOURCE_JOBSSERVICE_PASSWORD"
+ quarkusFlywayJobsServiceSchemas =
"QUARKUS_FLYWAY_JOBSSERVICE_SCHEMAS"
+)
+
+type DBMigrationJobCfg struct {
+ JobName string
+ ContainerName string
+ ToolImageName string
+ MigrationCmd string
+}
+
+func getDBSchemaName(persistencePostgreSQL *operatorapi.PersistencePostgreSQL,
defaultSchemaName string) string {
+ if persistencePostgreSQL != nil && persistencePostgreSQL.ServiceRef !=
nil && len(persistencePostgreSQL.ServiceRef.DatabaseSchema) > 0 {
+ return persistencePostgreSQL.ServiceRef.DatabaseSchema
+ }
+
+ if persistencePostgreSQL != nil && len(persistencePostgreSQL.JdbcUrl) >
0 {
+ jdbcURL := persistencePostgreSQL.JdbcUrl
+ _, a, found := strings.Cut(jdbcURL, "currentSchema=")
+
+ if found {
+ if strings.Contains(a, "&") {
+ b, _, found := strings.Cut(a, "&")
+ if found {
+ return b
+ }
+ } else {
+ return a
+ }
+ }
+ }
+ return defaultSchemaName
+}
+
+func getJdbcUrl(postgresql *operatorapi.PersistencePostgreSQL,
defaultDBSchemaName string) string {
+ databaseSchema := defaultDBSchemaName
+ dataSourcePort := constants.DefaultPostgreSQLPort
+ databaseName := constants.DefaultDatabaseName
+ postgresServiceName := constants.DefaultPostgresServiceName
+
+ dataSourceURL := ""
+
+ if postgresql.ServiceRef != nil {
+ if len(postgresql.ServiceRef.DatabaseSchema) > 0 {
+ databaseSchema = postgresql.ServiceRef.DatabaseSchema
+ }
+ if postgresql.ServiceRef.Port != nil {
+ dataSourcePort = *postgresql.ServiceRef.Port
+ }
+ if len(postgresql.ServiceRef.DatabaseName) > 0 {
+ databaseName = postgresql.ServiceRef.DatabaseName
+ }
+ dataSourceURL =
fmt.Sprintf("jdbc:postgresql://%s:%d/%s?currentSchema=%s",
postgresql.ServiceRef.Name, dataSourcePort, databaseName, databaseSchema)
+ } else if len(postgresql.JdbcUrl) > 0 {
+ dataSourceURL = postgresql.JdbcUrl
+ } else {
+ dataSourceURL =
fmt.Sprintf("jdbc:postgresql://%s:%d/%s?currentSchema=%s", postgresServiceName,
dataSourcePort, databaseName, databaseSchema)
+ }
+
+ return dataSourceURL
+}
+
+func getQuarkusDataSourceFromPersistence(ctx context.Context, platform
*operatorapi.SonataFlowPlatform, persistence
*operatorapi.PersistenceOptionsSpec, defaultSchemaName string)
*QuarkusDataSource {
+ if persistence != nil && persistence.PostgreSQL != nil {
+ quarkusDataSource := &QuarkusDataSource{}
+ quarkusDataSource.JdbcUrl = getJdbcUrl(persistence.PostgreSQL,
defaultSchemaName)
+ quarkusDataSource.Username, _ =
services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name,
persistence.PostgreSQL.SecretRef.UserKey, platform.Namespace)
+ quarkusDataSource.Password, _ =
services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name,
persistence.PostgreSQL.SecretRef.PasswordKey, platform.Namespace)
+ quarkusDataSource.Schema =
getDBSchemaName(persistence.PostgreSQL, defaultSchemaName)
+ return quarkusDataSource
+ }
+
+ return nil
+}
+
+func NewDBMigratorJobData(ctx context.Context, client client.Client, platform
*operatorapi.SonataFlowPlatform, pshDI services.PlatformServiceHandler, pshJS
services.PlatformServiceHandler) *DBMigratorJob {
+
+ diJobsBasedDBMigration := false
+ jsJobsBasedDBMigration := false
+
+ if pshDI.IsPersistenceEnabledtInSpec() {
+ diJobsBasedDBMigration =
services.IsJobsBasedDBMigration(platform.Spec.Services.DataIndex.Persistence)
+ }
+ if pshJS.IsPersistenceEnabledtInSpec() {
+ jsJobsBasedDBMigration =
services.IsJobsBasedDBMigration(platform.Spec.Services.JobService.Persistence)
+ }
+
+ if (pshDI.IsServiceSetInSpec() && diJobsBasedDBMigration) ||
(pshJS.IsServiceSetInSpec() && jsJobsBasedDBMigration) {
+ quarkusDataSourceDataIndex := &QuarkusDataSource{}
+ quarkusDataSourceJobService := &QuarkusDataSource{}
+
+ if diJobsBasedDBMigration {
+ quarkusDataSourceDataIndex =
getQuarkusDataSourceFromPersistence(ctx, platform,
platform.Spec.Services.DataIndex.Persistence, "data-index-service")
+ }
+
+ if jsJobsBasedDBMigration {
+ quarkusDataSourceJobService =
getQuarkusDataSourceFromPersistence(ctx, platform,
platform.Spec.Services.JobService.Persistence, "jobs-service")
+ }
+
+ return &DBMigratorJob{
+ MigrateDBDataIndex: diJobsBasedDBMigration,
+ DataIndexDataSource: quarkusDataSourceDataIndex,
+ MigrateDBJobsService: jsJobsBasedDBMigration,
+ JobsServiceDataSource: quarkusDataSourceJobService,
+ }
+ }
+ return nil
+}
+
+func newQuarkusDataSource(jdbcURL string, userName string, password string,
schema string) *QuarkusDataSource {
+ return &QuarkusDataSource{
+ JdbcUrl: jdbcURL,
+ Username: userName,
+ Password: password,
+ Schema: schema,
+ }
+}
+
+func (dbmj DBMigratorJob) CreateJobDBMigration(platform
*operatorapi.SonataFlowPlatform) *batchv1.Job {
+
+ diQuarkusDataSource := newQuarkusDataSource("", "", "", "")
+ jsQuarkusDataSource := newQuarkusDataSource("", "", "", "")
+
+ if dbmj.DataIndexDataSource != nil {
+ diQuarkusDataSource.JdbcUrl = dbmj.DataIndexDataSource.JdbcUrl
+ diQuarkusDataSource.Username = dbmj.DataIndexDataSource.Username
+ diQuarkusDataSource.Password = dbmj.DataIndexDataSource.Password
+ diQuarkusDataSource.Schema = dbmj.DataIndexDataSource.Schema
+ }
+
+ if dbmj.JobsServiceDataSource != nil {
+ jsQuarkusDataSource.JdbcUrl = dbmj.JobsServiceDataSource.JdbcUrl
+ jsQuarkusDataSource.Username =
dbmj.JobsServiceDataSource.Username
+ jsQuarkusDataSource.Password =
dbmj.JobsServiceDataSource.Password
+ jsQuarkusDataSource.Schema = dbmj.JobsServiceDataSource.Schema
+ }
+
+ dbMigrationJobCfg := newDBMigrationJobCfg()
+ job := &batchv1.Job{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: dbMigrationJobCfg.JobName,
+ Namespace: platform.Namespace,
+ },
+ Spec: batchv1.JobSpec{
+ Template: corev1.PodTemplateSpec{
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{
+ {
+ Name:
dbMigrationJobCfg.ContainerName,
+ Image:
dbMigrationJobCfg.ToolImageName,
+ Env: []corev1.EnvVar{
+ {
+ Name:
migrateDBDataIndex,
+ Value:
strconv.FormatBool(dbmj.MigrateDBDataIndex),
+ },
+ {
+ Name:
quarkusDataSourceDataIndexJdbcURL,
+ Value:
diQuarkusDataSource.JdbcUrl,
+ },
+ {
+ Name:
quarkusDataSourceDataIndexUserName,
+ Value:
diQuarkusDataSource.Username,
+ },
+ {
+ Name:
quarkusDataSourceDataIndexPassword,
+ Value:
diQuarkusDataSource.Password,
+ },
+ {
+ Name:
quarkusFlywayDataIndexSchemas,
+ Value:
diQuarkusDataSource.Schema,
+ },
+ {
+ Name:
migrateDBJobsService,
+ Value:
strconv.FormatBool(dbmj.MigrateDBJobsService),
+ },
+ {
+ Name:
quarkusDataSourceJobsServiceJdbcURL,
+ Value:
jsQuarkusDataSource.JdbcUrl,
+ },
+ {
+ Name:
quarkusDataSourceJobsServiceUserName,
+ Value:
jsQuarkusDataSource.Username,
+ },
+ {
+ Name:
quarkusDataSourceJobsServicePassword,
+ Value:
jsQuarkusDataSource.Password,
+ },
+ {
+ Name:
quarkusFlywayJobsServiceSchemas,
+ Value:
jsQuarkusDataSource.Schema,
+ },
+ },
+ Command: []string{
+
dbMigrationJobCfg.MigrationCmd,
+ },
+ },
+ },
+ RestartPolicy: "Never",
+ },
+ },
+ BackoffLimit: pointer.Int32(0),
+ },
+ }
+ return job
+}
+
+// GetDBMigrationJobStatus Returns db migration job status
+func (dbmj DBMigratorJob) GetDBMigrationJobStatus(ctx context.Context, client
client.Client, platform *operatorapi.SonataFlowPlatform) (*batchv1.JobStatus,
error) {
+ job, err := client.BatchV1().Jobs(platform.Namespace).Get(ctx,
dbMigrationJobName, metav1.GetOptions{})
+ if err != nil {
+ klog.V(log.E).InfoS("Error getting DB migrator job while
monitoring completion: ", "error", err)
+ return nil, err
+ }
+ return &job.Status, nil
+}
+
+// NewSonataFlowPlatformDBMigrationPhase Returns a new DB migration phase for
SonataFlowPlatform
+func NewSonataFlowPlatformDBMigrationPhase(status
operatorapi.DBMigrationStatus, message string, reason string)
*operatorapi.SonataFlowPlatformDBMigrationPhase {
+ return &operatorapi.SonataFlowPlatformDBMigrationPhase{
+ Status: status,
+ Message: message,
+ Reason: reason,
+ }
+}
+
+// UpdateSonataFlowPlatformDBMigrationPhase Updates a given
SonataFlowPlatformDBMigrationPhase with the supplied values
+func UpdateSonataFlowPlatformDBMigrationPhase(dbMigrationStatus
*operatorapi.SonataFlowPlatformDBMigrationPhase, status
operatorapi.DBMigrationStatus, message string, reason string)
*operatorapi.SonataFlowPlatformDBMigrationPhase {
+ if dbMigrationStatus != nil {
+ dbMigrationStatus.Status = status
+ dbMigrationStatus.Message = message
+ dbMigrationStatus.Reason = reason
+ return dbMigrationStatus
+ }
+ return nil
+}
+
+func getKogitoDBMigratorToolImageName() string {
+
+ imgTag := cfg.GetCfg().KogitoDBMigratorToolImageTag
+
+ if imgTag == "" {
+ // returns
"docker.io/apache/incubator-kie-kogito-db-migrator-tool:<tag>"
+ imgTag = fmt.Sprintf("%s-%s:%s", constants.ImageNamePrefix,
constants.KogitoDBMigratorTool, version.GetImageTagVersion())
+ }
+ return imgTag
+}
+
+func newDBMigrationJobCfg() *DBMigrationJobCfg {
+ return &DBMigrationJobCfg{
+ JobName: dbMigrationJobName,
+ ContainerName: dbMigrationContainerName,
+ ToolImageName: getKogitoDBMigratorToolImageName(),
+ MigrationCmd: dbMigrationCmd,
+ }
+}
+
+// ReconcileDBMigrationJob Check the status of running DB migration job and
return status
+func (dbmj DBMigratorJob) ReconcileDBMigrationJob(ctx context.Context, client
client.Client, platform *operatorapi.SonataFlowPlatform) (*batchv1.JobStatus,
error) {
+ platform.Status.SonataFlowPlatformDBMigrationPhase =
NewSonataFlowPlatformDBMigrationPhase(operatorapi.DBMigrationStatusStarted,
operatorapi.MessageDBMigrationStatusStarted,
operatorapi.ReasonDBMigrationStatusStarted)
+
+ dbMigratorJobStatus, err := dbmj.GetDBMigrationJobStatus(ctx, client,
platform)
+ if err != nil {
+ return nil, err
+ }
+
+ klog.V(log.I).InfoS("Db migration job status: ", "active",
dbMigratorJobStatus.Active, "ready", dbMigratorJobStatus.Ready, "failed",
dbMigratorJobStatus.Failed, "success", dbMigratorJobStatus.Succeeded,
"CompletedIndexes", dbMigratorJobStatus.CompletedIndexes, "terminatedPods",
dbMigratorJobStatus.UncountedTerminatedPods)
+
+ if dbMigratorJobStatus.Failed == dbMigrationJobFailed {
Review Comment:
I think this conditions to check the failure or sucesss fo the the job by
using constants with the same value == 1 is a bit confusing when we read this
method.
I'd recommend creating a function e.g. hasFailed(status * JobStatus) and do
this calculations there instead
##########
packages/sonataflow-operator/internal/controller/platform/k8s.go:
##########
@@ -62,20 +63,86 @@ func (action *serviceAction) CanHandle(platform
*operatorapi.SonataFlowPlatform)
return platform.Status.IsReady()
}
+func (action *serviceAction) createOrUpdateDBMigrationJob(ctx context.Context,
client client.Client, platform *operatorapi.SonataFlowPlatform, pshDI
services.PlatformServiceHandler, pshJS services.PlatformServiceHandler)
(*DBMigratorJob, error) {
+ dbMigratorJob := NewDBMigratorJobData(ctx, action.client, platform,
pshDI, pshJS)
+
+ // Invoke DB Migration only if both or either DI/JS services are
requested, in addition to DBMigrationStrategyJob
+ if dbMigratorJob != nil {
+ klog.V(log.I).InfoS("Starting DB Migration Job: ")
+
+ job := dbMigratorJob.CreateJobDBMigration(platform)
+ if op, err := controllerutil.CreateOrUpdate(ctx, client, job,
func() error {
+ return nil
+ }); err != nil {
+ return dbMigratorJob, err
+ } else {
+ klog.V(log.I).InfoS("DB Migration Job successfully
created on cluster", "operation", op)
Review Comment:
same here, namespace and job name is very welcome
##########
packages/sonataflow-operator/internal/controller/platform/k8s.go:
##########
@@ -62,20 +63,86 @@ func (action *serviceAction) CanHandle(platform
*operatorapi.SonataFlowPlatform)
return platform.Status.IsReady()
}
+func (action *serviceAction) createOrUpdateDBMigrationJob(ctx context.Context,
client client.Client, platform *operatorapi.SonataFlowPlatform, pshDI
services.PlatformServiceHandler, pshJS services.PlatformServiceHandler)
(*DBMigratorJob, error) {
+ dbMigratorJob := NewDBMigratorJobData(ctx, action.client, platform,
pshDI, pshJS)
+
+ // Invoke DB Migration only if both or either DI/JS services are
requested, in addition to DBMigrationStrategyJob
+ if dbMigratorJob != nil {
+ klog.V(log.I).InfoS("Starting DB Migration Job: ")
+
+ job := dbMigratorJob.CreateJobDBMigration(platform)
+ if op, err := controllerutil.CreateOrUpdate(ctx, client, job,
func() error {
+ return nil
+ }); err != nil {
+ return dbMigratorJob, err
+ } else {
+ klog.V(log.I).InfoS("DB Migration Job successfully
created on cluster", "operation", op)
+ }
+ }
+ return dbMigratorJob, nil
+}
+
+func (action *serviceAction) handleDBMigrationJob(ctx context.Context,
platform *operatorapi.SonataFlowPlatform, psDI services.PlatformServiceHandler,
psJS services.PlatformServiceHandler) (*operatorapi.SonataFlowPlatform, error) {
+
+ dbMigratorJob, err := action.createOrUpdateDBMigrationJob(ctx,
action.client, platform, psDI, psJS)
+ if err != nil {
+ return nil, err
+ }
+ if dbMigratorJob != nil {
+ klog.V(log.E).InfoS("Created DB migration job")
+ dbMigratorJobStatus, err :=
dbMigratorJob.ReconcileDBMigrationJob(ctx, action.client, platform)
+ if err != nil {
+ return nil, err
+ }
+ if dbMigratorJobStatus.Failed == 1 {
+ return nil, errors.New("DB migration job failed")
+ } else if dbMigratorJobStatus.Succeeded == 1 {
+ return platform, nil
+ } else {
+ // DB migration is still running
+ return nil, nil
+ }
+ }
+
+ return platform, nil
+}
+
+func (action *serviceAction) isJobsBasedDBMigration(platform
*operatorapi.SonataFlowPlatform, pshDI services.PlatformServiceHandler, pshJS
services.PlatformServiceHandler) bool {
+ diJobsBasedDBMigration := false
+ jsJobsBasedDBMigration := false
+
+ if pshDI.IsPersistenceEnabledtInSpec() {
+ diJobsBasedDBMigration =
services.IsJobsBasedDBMigration(platform.Spec.Services.DataIndex.Persistence)
+ }
+ if pshJS.IsPersistenceEnabledtInSpec() {
+ jsJobsBasedDBMigration =
services.IsJobsBasedDBMigration(platform.Spec.Services.JobService.Persistence)
+ }
+
+ return (pshDI.IsServiceSetInSpec() && diJobsBasedDBMigration) ||
(pshJS.IsServiceSetInSpec() && jsJobsBasedDBMigration)
+}
+
func (action *serviceAction) Handle(ctx context.Context, platform
*operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform,
*corev1.Event, error) {
// Refresh applied configuration
if err := CreateOrUpdateWithDefaults(ctx, platform, false); err != nil {
return nil, nil, err
}
psDI := services.NewDataIndexHandler(platform)
+ psJS := services.NewJobServiceHandler(platform)
+
+ if action.isJobsBasedDBMigration(platform, psDI, psJS) {
+ p, err := action.handleDBMigrationJob(ctx, platform, psDI, psJS)
+ if p == nil {
Review Comment:
if p == nill we return the err...., not good.
I suggest making this more clear as part of the suggestions for the
isJobBasedDBMigration and handleDBMigratonJob functions.
##########
packages/sonataflow-operator/internal/controller/platform/k8s.go:
##########
@@ -62,20 +63,86 @@ func (action *serviceAction) CanHandle(platform
*operatorapi.SonataFlowPlatform)
return platform.Status.IsReady()
}
+func (action *serviceAction) createOrUpdateDBMigrationJob(ctx context.Context,
client client.Client, platform *operatorapi.SonataFlowPlatform, pshDI
services.PlatformServiceHandler, pshJS services.PlatformServiceHandler)
(*DBMigratorJob, error) {
+ dbMigratorJob := NewDBMigratorJobData(ctx, action.client, platform,
pshDI, pshJS)
+
+ // Invoke DB Migration only if both or either DI/JS services are
requested, in addition to DBMigrationStrategyJob
+ if dbMigratorJob != nil {
+ klog.V(log.I).InfoS("Starting DB Migration Job: ")
Review Comment:
printing the job namespace and name will be very helpful for future error
tracing, etc.
##########
packages/sonataflow-operator/internal/controller/platform/k8s.go:
##########
@@ -62,20 +63,86 @@ func (action *serviceAction) CanHandle(platform
*operatorapi.SonataFlowPlatform)
return platform.Status.IsReady()
}
+func (action *serviceAction) createOrUpdateDBMigrationJob(ctx context.Context,
client client.Client, platform *operatorapi.SonataFlowPlatform, pshDI
services.PlatformServiceHandler, pshJS services.PlatformServiceHandler)
(*DBMigratorJob, error) {
+ dbMigratorJob := NewDBMigratorJobData(ctx, action.client, platform,
pshDI, pshJS)
+
+ // Invoke DB Migration only if both or either DI/JS services are
requested, in addition to DBMigrationStrategyJob
+ if dbMigratorJob != nil {
+ klog.V(log.I).InfoS("Starting DB Migration Job: ")
+
+ job := dbMigratorJob.CreateJobDBMigration(platform)
+ if op, err := controllerutil.CreateOrUpdate(ctx, client, job,
func() error {
+ return nil
+ }); err != nil {
+ return dbMigratorJob, err
+ } else {
+ klog.V(log.I).InfoS("DB Migration Job successfully
created on cluster", "operation", op)
+ }
+ }
+ return dbMigratorJob, nil
+}
+
+func (action *serviceAction) handleDBMigrationJob(ctx context.Context,
platform *operatorapi.SonataFlowPlatform, psDI services.PlatformServiceHandler,
psJS services.PlatformServiceHandler) (*operatorapi.SonataFlowPlatform, error) {
+
+ dbMigratorJob, err := action.createOrUpdateDBMigrationJob(ctx,
action.client, platform, psDI, psJS)
+ if err != nil {
+ return nil, err
+ }
+ if dbMigratorJob != nil {
+ klog.V(log.E).InfoS("Created DB migration job")
+ dbMigratorJobStatus, err :=
dbMigratorJob.ReconcileDBMigrationJob(ctx, action.client, platform)
+ if err != nil {
+ return nil, err
+ }
+ if dbMigratorJobStatus.Failed == 1 {
+ return nil, errors.New("DB migration job failed")
+ } else if dbMigratorJobStatus.Succeeded == 1 {
+ return platform, nil
+ } else {
+ // DB migration is still running
+ return nil, nil
+ }
+ }
+
+ return platform, nil
+}
+
+func (action *serviceAction) isJobsBasedDBMigration(platform
*operatorapi.SonataFlowPlatform, pshDI services.PlatformServiceHandler, pshJS
services.PlatformServiceHandler) bool {
Review Comment:
I believe this method is not particularly belonging to the serviceAction.
##########
packages/sonataflow-operator/internal/controller/platform/k8s.go:
##########
@@ -62,20 +63,86 @@ func (action *serviceAction) CanHandle(platform
*operatorapi.SonataFlowPlatform)
return platform.Status.IsReady()
}
+func (action *serviceAction) createOrUpdateDBMigrationJob(ctx context.Context,
client client.Client, platform *operatorapi.SonataFlowPlatform, pshDI
services.PlatformServiceHandler, pshJS services.PlatformServiceHandler)
(*DBMigratorJob, error) {
+ dbMigratorJob := NewDBMigratorJobData(ctx, action.client, platform,
pshDI, pshJS)
+
+ // Invoke DB Migration only if both or either DI/JS services are
requested, in addition to DBMigrationStrategyJob
+ if dbMigratorJob != nil {
+ klog.V(log.I).InfoS("Starting DB Migration Job: ")
+
+ job := dbMigratorJob.CreateJobDBMigration(platform)
+ if op, err := controllerutil.CreateOrUpdate(ctx, client, job,
func() error {
+ return nil
+ }); err != nil {
+ return dbMigratorJob, err
+ } else {
+ klog.V(log.I).InfoS("DB Migration Job successfully
created on cluster", "operation", op)
+ }
+ }
+ return dbMigratorJob, nil
+}
+
+func (action *serviceAction) handleDBMigrationJob(ctx context.Context,
platform *operatorapi.SonataFlowPlatform, psDI services.PlatformServiceHandler,
psJS services.PlatformServiceHandler) (*operatorapi.SonataFlowPlatform, error) {
+
+ dbMigratorJob, err := action.createOrUpdateDBMigrationJob(ctx,
action.client, platform, psDI, psJS)
+ if err != nil {
+ return nil, err
+ }
+ if dbMigratorJob != nil {
+ klog.V(log.E).InfoS("Created DB migration job")
+ dbMigratorJobStatus, err :=
dbMigratorJob.ReconcileDBMigrationJob(ctx, action.client, platform)
+ if err != nil {
+ return nil, err
+ }
+ if dbMigratorJobStatus.Failed == 1 {
+ return nil, errors.New("DB migration job failed")
+ } else if dbMigratorJobStatus.Succeeded == 1 {
+ return platform, nil
+ } else {
+ // DB migration is still running
+ return nil, nil
+ }
+ }
+
+ return platform, nil
+}
+
+func (action *serviceAction) isJobsBasedDBMigration(platform
*operatorapi.SonataFlowPlatform, pshDI services.PlatformServiceHandler, pshJS
services.PlatformServiceHandler) bool {
+ diJobsBasedDBMigration := false
+ jsJobsBasedDBMigration := false
+
+ if pshDI.IsPersistenceEnabledtInSpec() {
Review Comment:
Consider my comments regarding a potential persistence configuration at the
SFP level
##########
packages/sonataflow-operator/internal/controller/platform/db_migrator_job.go:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package platform
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "strconv"
+ "strings"
+
+ batchv1 "k8s.io/api/batch/v1"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/klog/v2"
+ "k8s.io/utils/pointer"
+
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/version"
+
+ operatorapi
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/container-builder/client"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/cfg"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform/services"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants"
+ "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
+)
+
+type QuarkusDataSource struct {
+ JdbcUrl string
+ Username string
+ Password string
+ Schema string
+}
+
+type DBMigratorJob struct {
+ MigrateDBDataIndex bool
+ DataIndexDataSource *QuarkusDataSource
+ MigrateDBJobsService bool
+ JobsServiceDataSource *QuarkusDataSource
+}
+
+const (
+ dbMigrationJobName = "sonataflow-db-migrator-job"
+ dbMigrationContainerName = "db-migration-container"
+ dbMigratorToolImage =
"quay.io/rhkp/incubator-kie-kogito-service-db-migration-postgresql:latest"
+ dbMigrationCmd = "./migration.sh"
+ dbMigrationJobFailed = 1
+ dbMigrationJobSucceeded = 1
+
+ migrateDBDataIndex = "MIGRATE_DB_DATAINDEX"
+ quarkusDataSourceDataIndexJdbcURL =
"QUARKUS_DATASOURCE_DATAINDEX_JDBC_URL"
+ quarkusDataSourceDataIndexUserName =
"QUARKUS_DATASOURCE_DATAINDEX_USERNAME"
+ quarkusDataSourceDataIndexPassword =
"QUARKUS_DATASOURCE_DATAINDEX_PASSWORD"
+ quarkusFlywayDataIndexSchemas = "QUARKUS_FLYWAY_DATAINDEX_SCHEMAS"
+
+ migrateDBJobsService = "MIGRATE_DB_JOBSSERVICE"
+ quarkusDataSourceJobsServiceJdbcURL =
"QUARKUS_DATASOURCE_JOBSSERVICE_JDBC_URL"
+ quarkusDataSourceJobsServiceUserName =
"QUARKUS_DATASOURCE_JOBSSERVICE_USERNAME"
+ quarkusDataSourceJobsServicePassword =
"QUARKUS_DATASOURCE_JOBSSERVICE_PASSWORD"
+ quarkusFlywayJobsServiceSchemas =
"QUARKUS_FLYWAY_JOBSSERVICE_SCHEMAS"
+)
+
+type DBMigrationJobCfg struct {
+ JobName string
+ ContainerName string
+ ToolImageName string
+ MigrationCmd string
+}
+
+func getDBSchemaName(persistencePostgreSQL *operatorapi.PersistencePostgreSQL,
defaultSchemaName string) string {
+ if persistencePostgreSQL != nil && persistencePostgreSQL.ServiceRef !=
nil && len(persistencePostgreSQL.ServiceRef.DatabaseSchema) > 0 {
+ return persistencePostgreSQL.ServiceRef.DatabaseSchema
+ }
+
+ if persistencePostgreSQL != nil && len(persistencePostgreSQL.JdbcUrl) >
0 {
+ jdbcURL := persistencePostgreSQL.JdbcUrl
+ _, a, found := strings.Cut(jdbcURL, "currentSchema=")
+
+ if found {
+ if strings.Contains(a, "&") {
+ b, _, found := strings.Cut(a, "&")
+ if found {
+ return b
+ }
+ } else {
+ return a
+ }
+ }
+ }
+ return defaultSchemaName
+}
+
+func getJdbcUrl(postgresql *operatorapi.PersistencePostgreSQL,
defaultDBSchemaName string) string {
+ databaseSchema := defaultDBSchemaName
+ dataSourcePort := constants.DefaultPostgreSQLPort
+ databaseName := constants.DefaultDatabaseName
+ postgresServiceName := constants.DefaultPostgresServiceName
+
+ dataSourceURL := ""
+
+ if postgresql.ServiceRef != nil {
+ if len(postgresql.ServiceRef.DatabaseSchema) > 0 {
+ databaseSchema = postgresql.ServiceRef.DatabaseSchema
+ }
+ if postgresql.ServiceRef.Port != nil {
+ dataSourcePort = *postgresql.ServiceRef.Port
+ }
+ if len(postgresql.ServiceRef.DatabaseName) > 0 {
+ databaseName = postgresql.ServiceRef.DatabaseName
+ }
+ dataSourceURL =
fmt.Sprintf("jdbc:postgresql://%s:%d/%s?currentSchema=%s",
postgresql.ServiceRef.Name, dataSourcePort, databaseName, databaseSchema)
+ } else if len(postgresql.JdbcUrl) > 0 {
+ dataSourceURL = postgresql.JdbcUrl
+ } else {
+ dataSourceURL =
fmt.Sprintf("jdbc:postgresql://%s:%d/%s?currentSchema=%s", postgresServiceName,
dataSourcePort, databaseName, databaseSchema)
+ }
+
+ return dataSourceURL
+}
+
+func getQuarkusDataSourceFromPersistence(ctx context.Context, platform
*operatorapi.SonataFlowPlatform, persistence
*operatorapi.PersistenceOptionsSpec, defaultSchemaName string)
*QuarkusDataSource {
+ if persistence != nil && persistence.PostgreSQL != nil {
+ quarkusDataSource := &QuarkusDataSource{}
+ quarkusDataSource.JdbcUrl = getJdbcUrl(persistence.PostgreSQL,
defaultSchemaName)
+ quarkusDataSource.Username, _ =
services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name,
persistence.PostgreSQL.SecretRef.UserKey, platform.Namespace)
+ quarkusDataSource.Password, _ =
services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name,
persistence.PostgreSQL.SecretRef.PasswordKey, platform.Namespace)
+ quarkusDataSource.Schema =
getDBSchemaName(persistence.PostgreSQL, defaultSchemaName)
+ return quarkusDataSource
+ }
+
+ return nil
+}
+
+func NewDBMigratorJobData(ctx context.Context, client client.Client, platform
*operatorapi.SonataFlowPlatform, pshDI services.PlatformServiceHandler, pshJS
services.PlatformServiceHandler) *DBMigratorJob {
+
+ diJobsBasedDBMigration := false
+ jsJobsBasedDBMigration := false
+
+ if pshDI.IsPersistenceEnabledtInSpec() {
+ diJobsBasedDBMigration =
services.IsJobsBasedDBMigration(platform.Spec.Services.DataIndex.Persistence)
+ }
+ if pshJS.IsPersistenceEnabledtInSpec() {
+ jsJobsBasedDBMigration =
services.IsJobsBasedDBMigration(platform.Spec.Services.JobService.Persistence)
+ }
+
+ if (pshDI.IsServiceSetInSpec() && diJobsBasedDBMigration) ||
(pshJS.IsServiceSetInSpec() && jsJobsBasedDBMigration) {
+ quarkusDataSourceDataIndex := &QuarkusDataSource{}
+ quarkusDataSourceJobService := &QuarkusDataSource{}
+
+ if diJobsBasedDBMigration {
+ quarkusDataSourceDataIndex =
getQuarkusDataSourceFromPersistence(ctx, platform,
platform.Spec.Services.DataIndex.Persistence, "data-index-service")
+ }
+
+ if jsJobsBasedDBMigration {
+ quarkusDataSourceJobService =
getQuarkusDataSourceFromPersistence(ctx, platform,
platform.Spec.Services.JobService.Persistence, "jobs-service")
+ }
+
+ return &DBMigratorJob{
+ MigrateDBDataIndex: diJobsBasedDBMigration,
+ DataIndexDataSource: quarkusDataSourceDataIndex,
+ MigrateDBJobsService: jsJobsBasedDBMigration,
+ JobsServiceDataSource: quarkusDataSourceJobService,
+ }
+ }
+ return nil
+}
+
+func newQuarkusDataSource(jdbcURL string, userName string, password string,
schema string) *QuarkusDataSource {
+ return &QuarkusDataSource{
+ JdbcUrl: jdbcURL,
+ Username: userName,
+ Password: password,
+ Schema: schema,
+ }
+}
+
+func (dbmj DBMigratorJob) CreateJobDBMigration(platform
*operatorapi.SonataFlowPlatform) *batchv1.Job {
+
+ diQuarkusDataSource := newQuarkusDataSource("", "", "", "")
+ jsQuarkusDataSource := newQuarkusDataSource("", "", "", "")
+
+ if dbmj.DataIndexDataSource != nil {
+ diQuarkusDataSource.JdbcUrl = dbmj.DataIndexDataSource.JdbcUrl
+ diQuarkusDataSource.Username = dbmj.DataIndexDataSource.Username
+ diQuarkusDataSource.Password = dbmj.DataIndexDataSource.Password
+ diQuarkusDataSource.Schema = dbmj.DataIndexDataSource.Schema
+ }
+
+ if dbmj.JobsServiceDataSource != nil {
+ jsQuarkusDataSource.JdbcUrl = dbmj.JobsServiceDataSource.JdbcUrl
+ jsQuarkusDataSource.Username =
dbmj.JobsServiceDataSource.Username
+ jsQuarkusDataSource.Password =
dbmj.JobsServiceDataSource.Password
+ jsQuarkusDataSource.Schema = dbmj.JobsServiceDataSource.Schema
+ }
+
+ dbMigrationJobCfg := newDBMigrationJobCfg()
+ job := &batchv1.Job{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: dbMigrationJobCfg.JobName,
+ Namespace: platform.Namespace,
+ },
+ Spec: batchv1.JobSpec{
+ Template: corev1.PodTemplateSpec{
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{
+ {
+ Name:
dbMigrationJobCfg.ContainerName,
+ Image:
dbMigrationJobCfg.ToolImageName,
+ Env: []corev1.EnvVar{
+ {
+ Name:
migrateDBDataIndex,
+ Value:
strconv.FormatBool(dbmj.MigrateDBDataIndex),
+ },
+ {
+ Name:
quarkusDataSourceDataIndexJdbcURL,
+ Value:
diQuarkusDataSource.JdbcUrl,
+ },
+ {
+ Name:
quarkusDataSourceDataIndexUserName,
+ Value:
diQuarkusDataSource.Username,
+ },
+ {
+ Name:
quarkusDataSourceDataIndexPassword,
+ Value:
diQuarkusDataSource.Password,
+ },
+ {
+ Name:
quarkusFlywayDataIndexSchemas,
+ Value:
diQuarkusDataSource.Schema,
+ },
+ {
+ Name:
migrateDBJobsService,
+ Value:
strconv.FormatBool(dbmj.MigrateDBJobsService),
+ },
+ {
+ Name:
quarkusDataSourceJobsServiceJdbcURL,
+ Value:
jsQuarkusDataSource.JdbcUrl,
+ },
+ {
+ Name:
quarkusDataSourceJobsServiceUserName,
+ Value:
jsQuarkusDataSource.Username,
+ },
+ {
+ Name:
quarkusDataSourceJobsServicePassword,
+ Value:
jsQuarkusDataSource.Password,
+ },
+ {
+ Name:
quarkusFlywayJobsServiceSchemas,
+ Value:
jsQuarkusDataSource.Schema,
+ },
+ },
+ Command: []string{
+
dbMigrationJobCfg.MigrationCmd,
+ },
+ },
+ },
+ RestartPolicy: "Never",
+ },
+ },
+ BackoffLimit: pointer.Int32(0),
+ },
+ }
+ return job
+}
+
+// GetDBMigrationJobStatus Returns db migration job status
+func (dbmj DBMigratorJob) GetDBMigrationJobStatus(ctx context.Context, client
client.Client, platform *operatorapi.SonataFlowPlatform) (*batchv1.JobStatus,
error) {
+ job, err := client.BatchV1().Jobs(platform.Namespace).Get(ctx,
dbMigrationJobName, metav1.GetOptions{})
+ if err != nil {
+ klog.V(log.E).InfoS("Error getting DB migrator job while
monitoring completion: ", "error", err)
+ return nil, err
+ }
+ return &job.Status, nil
+}
+
+// NewSonataFlowPlatformDBMigrationPhase Returns a new DB migration phase for
SonataFlowPlatform
+func NewSonataFlowPlatformDBMigrationPhase(status
operatorapi.DBMigrationStatus, message string, reason string)
*operatorapi.SonataFlowPlatformDBMigrationPhase {
+ return &operatorapi.SonataFlowPlatformDBMigrationPhase{
+ Status: status,
+ Message: message,
+ Reason: reason,
+ }
+}
+
+// UpdateSonataFlowPlatformDBMigrationPhase Updates a given
SonataFlowPlatformDBMigrationPhase with the supplied values
+func UpdateSonataFlowPlatformDBMigrationPhase(dbMigrationStatus
*operatorapi.SonataFlowPlatformDBMigrationPhase, status
operatorapi.DBMigrationStatus, message string, reason string)
*operatorapi.SonataFlowPlatformDBMigrationPhase {
+ if dbMigrationStatus != nil {
+ dbMigrationStatus.Status = status
+ dbMigrationStatus.Message = message
+ dbMigrationStatus.Reason = reason
+ return dbMigrationStatus
+ }
+ return nil
+}
+
+func getKogitoDBMigratorToolImageName() string {
+
+ imgTag := cfg.GetCfg().KogitoDBMigratorToolImageTag
+
+ if imgTag == "" {
+ // returns
"docker.io/apache/incubator-kie-kogito-db-migrator-tool:<tag>"
+ imgTag = fmt.Sprintf("%s-%s:%s", constants.ImageNamePrefix,
constants.KogitoDBMigratorTool, version.GetImageTagVersion())
+ }
+ return imgTag
+}
+
+func newDBMigrationJobCfg() *DBMigrationJobCfg {
+ return &DBMigrationJobCfg{
+ JobName: dbMigrationJobName,
+ ContainerName: dbMigrationContainerName,
+ ToolImageName: getKogitoDBMigratorToolImageName(),
+ MigrationCmd: dbMigrationCmd,
+ }
+}
+
+// ReconcileDBMigrationJob Check the status of running DB migration job and
return status
+func (dbmj DBMigratorJob) ReconcileDBMigrationJob(ctx context.Context, client
client.Client, platform *operatorapi.SonataFlowPlatform) (*batchv1.JobStatus,
error) {
+ platform.Status.SonataFlowPlatformDBMigrationPhase =
NewSonataFlowPlatformDBMigrationPhase(operatorapi.DBMigrationStatusStarted,
operatorapi.MessageDBMigrationStatusStarted,
operatorapi.ReasonDBMigrationStatusStarted)
+
+ dbMigratorJobStatus, err := dbmj.GetDBMigrationJobStatus(ctx, client,
platform)
+ if err != nil {
+ return nil, err
+ }
+
+ klog.V(log.I).InfoS("Db migration job status: ", "active",
dbMigratorJobStatus.Active, "ready", dbMigratorJobStatus.Ready, "failed",
dbMigratorJobStatus.Failed, "success", dbMigratorJobStatus.Succeeded,
"CompletedIndexes", dbMigratorJobStatus.CompletedIndexes, "terminatedPods",
dbMigratorJobStatus.UncountedTerminatedPods)
+
+ if dbMigratorJobStatus.Failed == dbMigrationJobFailed {
+ platform.Status.SonataFlowPlatformDBMigrationPhase =
UpdateSonataFlowPlatformDBMigrationPhase(platform.Status.SonataFlowPlatformDBMigrationPhase,
operatorapi.DBMigrationStatusFailed,
operatorapi.MessageDBMigrationStatusFailed,
operatorapi.ReasonDBMigrationStatusFailed)
+ klog.V(log.I).InfoS("DB migration job failed")
+ return dbMigratorJobStatus, errors.New("DB migration job
failed")
+ } else if dbMigratorJobStatus.Succeeded == dbMigrationJobSucceeded {
+ platform.Status.SonataFlowPlatformDBMigrationPhase =
UpdateSonataFlowPlatformDBMigrationPhase(platform.Status.SonataFlowPlatformDBMigrationPhase,
operatorapi.DBMigrationStatusSucceeded,
operatorapi.MessageDBMigrationStatusSucceeded,
operatorapi.ReasonDBMigrationStatusSucceeded)
+ klog.V(log.I).InfoS("DB migration job succeeded")
Review Comment:
same here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]