jakubschwan commented on code in PR #2790:
URL: 
https://github.com/apache/incubator-kie-tools/pull/2790#discussion_r1923813149


##########
packages/sonataflow-operator/api/v1alpha08/sonataflow_persistence_types.go:
##########
@@ -54,10 +62,13 @@ type PersistenceOptionsSpec struct {
        // +optional
        PostgreSQL *PersistencePostgreSQL `json:"postgresql,omitempty"`
 
-       // Whether to migrate database on service startup?
+       // DB Migration approach for service?

Review Comment:
   Please update the description to better describe the configuration for DB 
migration.  And remove the question mark from the description.



##########
packages/sonataflow-operator/internal/controller/platform/db_migrator_job.go:
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package platform
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "strconv"
+       "strings"
+
+       batchv1 "k8s.io/api/batch/v1"
+       corev1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/klog/v2"
+       "k8s.io/utils/pointer"
+       "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/version"
+
+       operatorapi 
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/container-builder/client"
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/cfg"
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform/services"
+       
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants"
+       "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log"
+)
+
+type QuarkusDataSource struct {
+       JdbcUrl  string
+       Username string
+       Password string
+       Schema   string
+}
+
+type DBMigratorJob struct {
+       MigrateDBDataIndex    bool
+       DataIndexDataSource   *QuarkusDataSource
+       MigrateDBJobsService  bool
+       JobsServiceDataSource *QuarkusDataSource
+}
+
+type DBMigratorJobStatus struct {
+       Name           string
+       BatchJobStatus *batchv1.JobStatus
+}
+
+const (
+       dbMigrationJobName       = "sonataflow-db-migrator-job"
+       dbMigrationContainerName = "db-migration-container"
+       dbMigrationCmd           = "./migration.sh"
+       dbMigrationJobFailed     = 1
+       dbMigrationJobSucceeded  = 1
+
+       migrateDBDataIndex                 = "MIGRATE_DB_DATAINDEX"
+       quarkusDataSourceDataIndexJdbcURL  = 
"QUARKUS_DATASOURCE_DATAINDEX_JDBC_URL"
+       quarkusDataSourceDataIndexUserName = 
"QUARKUS_DATASOURCE_DATAINDEX_USERNAME"
+       quarkusDataSourceDataIndexPassword = 
"QUARKUS_DATASOURCE_DATAINDEX_PASSWORD"
+       quarkusFlywayDataIndexSchemas      = "QUARKUS_FLYWAY_DATAINDEX_SCHEMAS"
+
+       migrateDBJobsService                 = "MIGRATE_DB_JOBSSERVICE"
+       quarkusDataSourceJobsServiceJdbcURL  = 
"QUARKUS_DATASOURCE_JOBSSERVICE_JDBC_URL"
+       quarkusDataSourceJobsServiceUserName = 
"QUARKUS_DATASOURCE_JOBSSERVICE_USERNAME"
+       quarkusDataSourceJobsServicePassword = 
"QUARKUS_DATASOURCE_JOBSSERVICE_PASSWORD"
+       quarkusFlywayJobsServiceSchemas      = 
"QUARKUS_FLYWAY_JOBSSERVICE_SCHEMAS"
+)
+
+type DBMigrationJobCfg struct {
+       JobName       string
+       ContainerName string
+       ToolImageName string
+       MigrationCmd  string
+}
+
+func getDBSchemaName(persistencePostgreSQL *operatorapi.PersistencePostgreSQL, 
defaultSchemaName string) string {
+       if persistencePostgreSQL != nil && persistencePostgreSQL.ServiceRef != 
nil && len(persistencePostgreSQL.ServiceRef.DatabaseSchema) > 0 {
+               return persistencePostgreSQL.ServiceRef.DatabaseSchema
+       }
+
+       if persistencePostgreSQL != nil && len(persistencePostgreSQL.JdbcUrl) > 
0 {
+               jdbcURL := persistencePostgreSQL.JdbcUrl
+               _, a, found := strings.Cut(jdbcURL, "currentSchema=")
+
+               if found {
+                       if strings.Contains(a, "&") {
+                               b, _, found := strings.Cut(a, "&")
+                               if found {
+                                       return b
+                               }
+                       } else {
+                               return a
+                       }
+               }
+       }
+       return defaultSchemaName
+}
+
+func getJdbcUrl(postgresql *operatorapi.PersistencePostgreSQL, 
defaultDBSchemaName string) string {
+       databaseSchema := defaultDBSchemaName
+       dataSourcePort := constants.DefaultPostgreSQLPort
+       databaseName := constants.DefaultDatabaseName
+       postgresServiceName := constants.DefaultPostgresServiceName
+
+       dataSourceURL := ""
+
+       if postgresql.ServiceRef != nil {
+               if len(postgresql.ServiceRef.DatabaseSchema) > 0 {
+                       databaseSchema = postgresql.ServiceRef.DatabaseSchema
+               }
+               if postgresql.ServiceRef.Port != nil {
+                       dataSourcePort = *postgresql.ServiceRef.Port
+               }
+               if len(postgresql.ServiceRef.DatabaseName) > 0 {
+                       databaseName = postgresql.ServiceRef.DatabaseName
+               }
+               dataSourceURL = 
fmt.Sprintf("jdbc:postgresql://%s:%d/%s?currentSchema=%s", 
postgresql.ServiceRef.Name, dataSourcePort, databaseName, databaseSchema)
+       } else if len(postgresql.JdbcUrl) > 0 {
+               dataSourceURL = postgresql.JdbcUrl
+       } else {
+               dataSourceURL = 
fmt.Sprintf("jdbc:postgresql://%s:%d/%s?currentSchema=%s", postgresServiceName, 
dataSourcePort, databaseName, databaseSchema)
+       }
+
+       return dataSourceURL
+}
+
+func getQuarkusDataSourceFromPersistence(ctx context.Context, platform 
*operatorapi.SonataFlowPlatform, persistence 
*operatorapi.PersistenceOptionsSpec, defaultSchemaName string) 
*QuarkusDataSource {
+       if persistence != nil && persistence.PostgreSQL != nil {
+               quarkusDataSource := &QuarkusDataSource{}
+               quarkusDataSource.JdbcUrl = getJdbcUrl(persistence.PostgreSQL, 
defaultSchemaName)
+               quarkusDataSource.Username, _ = 
services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name, 
persistence.PostgreSQL.SecretRef.UserKey, platform.Namespace)
+               quarkusDataSource.Password, _ = 
services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name, 
persistence.PostgreSQL.SecretRef.PasswordKey, platform.Namespace)
+               quarkusDataSource.Schema = 
getDBSchemaName(persistence.PostgreSQL, defaultSchemaName)
+               return quarkusDataSource
+       }
+
+       return nil
+}
+
+func NewDBMigratorJobData(ctx context.Context, client client.Client, platform 
*operatorapi.SonataFlowPlatform, pshDI services.PlatformServiceHandler, pshJS 
services.PlatformServiceHandler) *DBMigratorJob {
+
+       diJobsBasedDBMigration := false
+       jsJobsBasedDBMigration := false
+
+       if pshDI.IsPersistenceEnabledtInSpec() {
+               diJobsBasedDBMigration = 
services.IsJobsBasedDBMigration(platform.Spec.Services.DataIndex.Persistence)
+       }
+       if pshJS.IsPersistenceEnabledtInSpec() {
+               jsJobsBasedDBMigration = 
services.IsJobsBasedDBMigration(platform.Spec.Services.JobService.Persistence)
+       }
+
+       if (pshDI.IsServiceSetInSpec() && diJobsBasedDBMigration) || 
(pshJS.IsServiceSetInSpec() && jsJobsBasedDBMigration) {
+               quarkusDataSourceDataIndex := &QuarkusDataSource{}
+               quarkusDataSourceJobService := &QuarkusDataSource{}
+
+               if diJobsBasedDBMigration {
+                       quarkusDataSourceDataIndex = 
getQuarkusDataSourceFromPersistence(ctx, platform, 
platform.Spec.Services.DataIndex.Persistence, "data-index-service")
+               }
+
+               if jsJobsBasedDBMigration {
+                       quarkusDataSourceJobService = 
getQuarkusDataSourceFromPersistence(ctx, platform, 
platform.Spec.Services.JobService.Persistence, "jobs-service")
+               }
+
+               return &DBMigratorJob{
+                       MigrateDBDataIndex:    diJobsBasedDBMigration,
+                       DataIndexDataSource:   quarkusDataSourceDataIndex,
+                       MigrateDBJobsService:  jsJobsBasedDBMigration,
+                       JobsServiceDataSource: quarkusDataSourceJobService,
+               }
+       }
+       return nil
+}
+
+// IsJobsBasedDBMigration returns whether job based db migration approach is 
needed?
+func IsJobsBasedDBMigration(platform *operatorapi.SonataFlowPlatform, pshDI 
services.PlatformServiceHandler, pshJS services.PlatformServiceHandler) bool {
+       diJobsBasedDBMigration := false
+       jsJobsBasedDBMigration := false
+
+       if pshDI.IsPersistenceEnabledtInSpec() {
+               diJobsBasedDBMigration = 
services.IsJobsBasedDBMigration(platform.Spec.Services.DataIndex.Persistence)
+       }
+       if pshJS.IsPersistenceEnabledtInSpec() {
+               jsJobsBasedDBMigration = 
services.IsJobsBasedDBMigration(platform.Spec.Services.JobService.Persistence)
+       }
+
+       return (pshDI.IsServiceSetInSpec() && diJobsBasedDBMigration) || 
(pshJS.IsServiceSetInSpec() && jsJobsBasedDBMigration)
+}
+
+func createOrUpdateDBMigrationJob(ctx context.Context, client client.Client, 
platform *operatorapi.SonataFlowPlatform, pshDI 
services.PlatformServiceHandler, pshJS services.PlatformServiceHandler) 
(*DBMigratorJob, error) {
+       dbMigratorJob := NewDBMigratorJobData(ctx, client, platform, pshDI, 
pshJS)
+
+       // Invoke DB Migration only if both or either DI/JS services are 
requested, in addition to DBMigrationStrategyJob
+       if dbMigratorJob != nil {
+               job := createJobDBMigration(platform, dbMigratorJob)
+               klog.V(log.I).InfoS("Starting DB Migration Job: ", "namespace", 
platform.Namespace, "job", job.Name)
+               if op, err := controllerutil.CreateOrUpdate(ctx, client, job, 
func() error {
+                       return nil
+               }); err != nil {
+                       return dbMigratorJob, err
+               } else {
+                       klog.V(log.I).InfoS("DB Migration Job successfully 
created on cluster", "operation", op, "namespace", platform.Namespace, "job", 
job.Name)
+               }
+       }
+       return dbMigratorJob, nil
+}
+
+// HandleDBMigrationJob Creates db migration job and executes it on the cluster
+func HandleDBMigrationJob(ctx context.Context, client client.Client, platform 
*operatorapi.SonataFlowPlatform, psDI services.PlatformServiceHandler, psJS 
services.PlatformServiceHandler) (*operatorapi.SonataFlowPlatform, error) {
+
+       dbMigratorJob, err := createOrUpdateDBMigrationJob(ctx, client, 
platform, psDI, psJS)
+       if err != nil {
+               return nil, err
+       }
+       if dbMigratorJob != nil {
+               klog.V(log.E).InfoS("Created DB migration job")
+               dbMigratorJobStatus, err := 
dbMigratorJob.ReconcileDBMigrationJob(ctx, client, platform)
+               if err != nil {
+                       return nil, err
+               }
+               if dbMigratorJobStatus.BatchJobStatus.Failed == 1 {
+                       return nil, errors.New("DB migration job " + 
dbMigratorJobStatus.Name + " failed in namespace: " + platform.Namespace)
+               } else if dbMigratorJobStatus.BatchJobStatus.Succeeded == 1 {

Review Comment:
   As there are functions for checking these statuses, can you replace these 
conditions by functions `hasFailed`  and `hasSucceeded`? 



##########
packages/sonataflow-operator/config/manager/controllers_cfg.yaml:
##########
@@ -30,6 +30,8 @@ jobsServiceEphemeralImageTag: 
"docker.io/apache/incubator-kie-kogito-jobs-servic
 # The Data Index image to use, if empty the operator will use the default 
Apache Community one based on the current operator's version
 dataIndexPostgreSQLImageTag: 
"docker.io/apache/incubator-kie-kogito-data-index-postgresql:main"
 dataIndexEphemeralImageTag: 
"docker.io/apache/incubator-kie-kogito-data-index-ephemeral:main"
+# The Kogito PostgreSQL DB Migrator image to use (TBD: to replace with apache 
image)
+dbMigratorToolImageTag: 
"quay.io/rhkp/incubator-kie-kogito-service-db-migration-postgresql:latest"

Review Comment:
   do not forget to replace with apache image



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to