wmedvede commented on code in PR #2790: URL: https://github.com/apache/incubator-kie-tools/pull/2790#discussion_r1894144756
########## packages/sonataflow-operator/internal/controller/platform/db_migrator_job.go: ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package platform + +import ( + "context" + "errors" + "strconv" + "strings" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + + operatorapi "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/container-builder/client" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform/services" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log" +) + +type QuarkusDataSource struct { + JdbcUrl string + Username string + Password string + Schema string +} + +type DBMigratorJob struct { + MigrateDBDataIndex bool + DataIndexDataSource *QuarkusDataSource + MigrateDBJobsService bool + JobsServiceDataSource *QuarkusDataSource +} + +const ( + dbMigrationJobName = "sonataflow-db-migrator-job" + dbMigrationContainerName = "db-migration-container" + dbMigratorToolImage = "quay.io/rhkp/incubator-kie-kogito-service-db-migration-postgresql:latest" + dbMigrationCmd = "./migration.sh" + dbMigrationJobFailed = 1 + dbMigrationJobSucceeded = 1 +) + +type DBMigrationJobCfg struct { + JobName string + ContainerName string + ToolImageName string + MigrationCmd string +} + +func getDBSchemaName(persistencePostgreSQL *operatorapi.PersistencePostgreSQL, defaultSchemaName string) string { + jdbcURL := persistencePostgreSQL.JdbcUrl + + if len(jdbcURL) == 0 { + if persistencePostgreSQL.ServiceRef != nil && len(persistencePostgreSQL.ServiceRef.DatabaseSchema) > 0 { + return persistencePostgreSQL.ServiceRef.DatabaseSchema + } + } else { + _, a, found := strings.Cut(jdbcURL, "currentSchema=") + + if found { + if strings.Contains(a, "&") { + b, _, found := strings.Cut(a, "&") + if found { + return b + } + } else { + return a + } + } + } + return defaultSchemaName +} + +func getQuarkusDataSourceFromPersistence(ctx context.Context, platform *operatorapi.SonataFlowPlatform, persistence *operatorapi.PersistenceOptionsSpec, defaultSchemaName string) *QuarkusDataSource { + if persistence != nil && persistence.PostgreSQL != nil { + quarkusDataSource := &QuarkusDataSource{} + quarkusDataSource.JdbcUrl = persistence.PostgreSQL.JdbcUrl + quarkusDataSource.Username, _ = services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name, persistence.PostgreSQL.SecretRef.UserKey, platform.Namespace) + quarkusDataSource.Password, _ = services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name, persistence.PostgreSQL.SecretRef.PasswordKey, platform.Namespace) + quarkusDataSource.Schema = getDBSchemaName(persistence.PostgreSQL, defaultSchemaName) + return quarkusDataSource + } + + return nil +} + +func NewDBMigratorJobData(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, pshDI services.PlatformServiceHandler, pshJS services.PlatformServiceHandler) *DBMigratorJob { + + diJobsBasedDBMigration := false + jsJobsBasedDBMigration := false + + if pshDI.IsPersistenceEnabledtInSpec() { + diJobsBasedDBMigration = services.IsJobsBasedDBMigration(platform.Spec.Services.DataIndex.Persistence) + } + if pshJS.IsPersistenceEnabledtInSpec() { + jsJobsBasedDBMigration = services.IsJobsBasedDBMigration(platform.Spec.Services.JobService.Persistence) + } + + if (pshDI.IsServiceSetInSpec() && diJobsBasedDBMigration) || (pshJS.IsServiceSetInSpec() && jsJobsBasedDBMigration) { + quarkusDataSourceDataIndex := &QuarkusDataSource{} + quarkusDataSourceJobService := &QuarkusDataSource{} + + if diJobsBasedDBMigration { + quarkusDataSourceDataIndex = getQuarkusDataSourceFromPersistence(ctx, platform, platform.Spec.Services.DataIndex.Persistence, "defaultDi") Review Comment: default schema names for the services must be consistent with what we do for each service, "defaultDI" is not good. ########## packages/sonataflow-operator/api/v1alpha08/sonataflow_persistence_types.go: ########## @@ -17,6 +17,14 @@ package v1alpha08 +type DBMigrationStrategyType string + +const ( + DBMigrationStrategyService DBMigrationStrategyType = "service" + DBMigrationStrategyJob DBMigrationStrategyType = "job" Review Comment: @rhkp @ricardozanini wondering if we should call this strategy "operator" instead :thinking: , and we don't "expose" how it is performed. ########## packages/sonataflow-operator/internal/controller/platform/db_migrator_job.go: ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package platform + +import ( + "context" + "errors" + "strconv" + "strings" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + + operatorapi "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/container-builder/client" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform/services" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log" +) + +type QuarkusDataSource struct { + JdbcUrl string + Username string + Password string + Schema string +} + +type DBMigratorJob struct { + MigrateDBDataIndex bool + DataIndexDataSource *QuarkusDataSource + MigrateDBJobsService bool + JobsServiceDataSource *QuarkusDataSource +} + +const ( + dbMigrationJobName = "sonataflow-db-migrator-job" + dbMigrationContainerName = "db-migration-container" + dbMigratorToolImage = "quay.io/rhkp/incubator-kie-kogito-service-db-migration-postgresql:latest" + dbMigrationCmd = "./migration.sh" + dbMigrationJobFailed = 1 + dbMigrationJobSucceeded = 1 +) + +type DBMigrationJobCfg struct { + JobName string + ContainerName string + ToolImageName string + MigrationCmd string +} + +func getDBSchemaName(persistencePostgreSQL *operatorapi.PersistencePostgreSQL, defaultSchemaName string) string { + jdbcURL := persistencePostgreSQL.JdbcUrl + + if len(jdbcURL) == 0 { + if persistencePostgreSQL.ServiceRef != nil && len(persistencePostgreSQL.ServiceRef.DatabaseSchema) > 0 { + return persistencePostgreSQL.ServiceRef.DatabaseSchema + } + } else { + _, a, found := strings.Cut(jdbcURL, "currentSchema=") + + if found { + if strings.Contains(a, "&") { + b, _, found := strings.Cut(a, "&") + if found { + return b + } + } else { + return a + } + } + } + return defaultSchemaName +} + +func getQuarkusDataSourceFromPersistence(ctx context.Context, platform *operatorapi.SonataFlowPlatform, persistence *operatorapi.PersistenceOptionsSpec, defaultSchemaName string) *QuarkusDataSource { + if persistence != nil && persistence.PostgreSQL != nil { + quarkusDataSource := &QuarkusDataSource{} + quarkusDataSource.JdbcUrl = persistence.PostgreSQL.JdbcUrl + quarkusDataSource.Username, _ = services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name, persistence.PostgreSQL.SecretRef.UserKey, platform.Namespace) + quarkusDataSource.Password, _ = services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name, persistence.PostgreSQL.SecretRef.PasswordKey, platform.Namespace) + quarkusDataSource.Schema = getDBSchemaName(persistence.PostgreSQL, defaultSchemaName) + return quarkusDataSource + } + + return nil +} + +func NewDBMigratorJobData(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, pshDI services.PlatformServiceHandler, pshJS services.PlatformServiceHandler) *DBMigratorJob { + + diJobsBasedDBMigration := false + jsJobsBasedDBMigration := false + + if pshDI.IsPersistenceEnabledtInSpec() { + diJobsBasedDBMigration = services.IsJobsBasedDBMigration(platform.Spec.Services.DataIndex.Persistence) + } + if pshJS.IsPersistenceEnabledtInSpec() { + jsJobsBasedDBMigration = services.IsJobsBasedDBMigration(platform.Spec.Services.JobService.Persistence) + } + + if (pshDI.IsServiceSetInSpec() && diJobsBasedDBMigration) || (pshJS.IsServiceSetInSpec() && jsJobsBasedDBMigration) { + quarkusDataSourceDataIndex := &QuarkusDataSource{} + quarkusDataSourceJobService := &QuarkusDataSource{} + + if diJobsBasedDBMigration { + quarkusDataSourceDataIndex = getQuarkusDataSourceFromPersistence(ctx, platform, platform.Spec.Services.DataIndex.Persistence, "defaultDi") + } + + if jsJobsBasedDBMigration { + quarkusDataSourceJobService = getQuarkusDataSourceFromPersistence(ctx, platform, platform.Spec.Services.JobService.Persistence, "defaultJs") + } + + return &DBMigratorJob{ + MigrateDBDataIndex: diJobsBasedDBMigration, + DataIndexDataSource: quarkusDataSourceDataIndex, + MigrateDBJobsService: jsJobsBasedDBMigration, + JobsServiceDataSource: quarkusDataSourceJobService, + } + } + return nil +} + +func getNewQuarkusDataSource(jdbcURL string, userName string, password string, schema string) *QuarkusDataSource { Review Comment: I'd prefer to call this method simply newQuarkusDataSource ########## packages/sonataflow-operator/internal/controller/platform/db_migrator_job.go: ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package platform + +import ( + "context" + "errors" + "strconv" + "strings" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + + operatorapi "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/container-builder/client" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform/services" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log" +) + +type QuarkusDataSource struct { + JdbcUrl string + Username string + Password string + Schema string +} + +type DBMigratorJob struct { + MigrateDBDataIndex bool + DataIndexDataSource *QuarkusDataSource + MigrateDBJobsService bool + JobsServiceDataSource *QuarkusDataSource +} + +const ( + dbMigrationJobName = "sonataflow-db-migrator-job" + dbMigrationContainerName = "db-migration-container" + dbMigratorToolImage = "quay.io/rhkp/incubator-kie-kogito-service-db-migration-postgresql:latest" + dbMigrationCmd = "./migration.sh" + dbMigrationJobFailed = 1 + dbMigrationJobSucceeded = 1 +) + +type DBMigrationJobCfg struct { + JobName string + ContainerName string + ToolImageName string + MigrationCmd string +} + +func getDBSchemaName(persistencePostgreSQL *operatorapi.PersistencePostgreSQL, defaultSchemaName string) string { + jdbcURL := persistencePostgreSQL.JdbcUrl + + if len(jdbcURL) == 0 { + if persistencePostgreSQL.ServiceRef != nil && len(persistencePostgreSQL.ServiceRef.DatabaseSchema) > 0 { + return persistencePostgreSQL.ServiceRef.DatabaseSchema + } + } else { + _, a, found := strings.Cut(jdbcURL, "currentSchema=") + + if found { + if strings.Contains(a, "&") { + b, _, found := strings.Cut(a, "&") + if found { + return b + } + } else { + return a + } + } + } + return defaultSchemaName +} + +func getQuarkusDataSourceFromPersistence(ctx context.Context, platform *operatorapi.SonataFlowPlatform, persistence *operatorapi.PersistenceOptionsSpec, defaultSchemaName string) *QuarkusDataSource { + if persistence != nil && persistence.PostgreSQL != nil { + quarkusDataSource := &QuarkusDataSource{} + quarkusDataSource.JdbcUrl = persistence.PostgreSQL.JdbcUrl + quarkusDataSource.Username, _ = services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name, persistence.PostgreSQL.SecretRef.UserKey, platform.Namespace) + quarkusDataSource.Password, _ = services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name, persistence.PostgreSQL.SecretRef.PasswordKey, platform.Namespace) + quarkusDataSource.Schema = getDBSchemaName(persistence.PostgreSQL, defaultSchemaName) + return quarkusDataSource + } + + return nil +} + +func NewDBMigratorJobData(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, pshDI services.PlatformServiceHandler, pshJS services.PlatformServiceHandler) *DBMigratorJob { + + diJobsBasedDBMigration := false + jsJobsBasedDBMigration := false + + if pshDI.IsPersistenceEnabledtInSpec() { + diJobsBasedDBMigration = services.IsJobsBasedDBMigration(platform.Spec.Services.DataIndex.Persistence) + } + if pshJS.IsPersistenceEnabledtInSpec() { + jsJobsBasedDBMigration = services.IsJobsBasedDBMigration(platform.Spec.Services.JobService.Persistence) + } + + if (pshDI.IsServiceSetInSpec() && diJobsBasedDBMigration) || (pshJS.IsServiceSetInSpec() && jsJobsBasedDBMigration) { + quarkusDataSourceDataIndex := &QuarkusDataSource{} + quarkusDataSourceJobService := &QuarkusDataSource{} + + if diJobsBasedDBMigration { + quarkusDataSourceDataIndex = getQuarkusDataSourceFromPersistence(ctx, platform, platform.Spec.Services.DataIndex.Persistence, "defaultDi") + } + + if jsJobsBasedDBMigration { + quarkusDataSourceJobService = getQuarkusDataSourceFromPersistence(ctx, platform, platform.Spec.Services.JobService.Persistence, "defaultJs") Review Comment: same here, default schema name "defaultJs", not good. ########## packages/sonataflow-operator/internal/controller/platform/db_migrator_job.go: ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package platform + +import ( + "context" + "errors" + "strconv" + "strings" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + + operatorapi "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/container-builder/client" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform/services" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log" +) + +type QuarkusDataSource struct { + JdbcUrl string + Username string + Password string + Schema string +} + +type DBMigratorJob struct { + MigrateDBDataIndex bool + DataIndexDataSource *QuarkusDataSource + MigrateDBJobsService bool + JobsServiceDataSource *QuarkusDataSource +} + +const ( + dbMigrationJobName = "sonataflow-db-migrator-job" + dbMigrationContainerName = "db-migration-container" + dbMigratorToolImage = "quay.io/rhkp/incubator-kie-kogito-service-db-migration-postgresql:latest" Review Comment: align with the new name apache/incubator-kie-kogito-db-migrator-tool and be sure that: 1. the tag is derived with a mechanism similar to what we have for the DataIndex and Jobs Serivce 2. we have a corresponding entry in the packages/sonataflow-operator/config/manager/controllers_cfg.yaml, and that we do an analogous processing to what we do for the DataIndex and Jobs Service images. i.e., if we have an image configured in the controllers_cfg.yaml, then, this image wins. see also, packages/sonataflow-operator/internal/controller/cfg/controllers_cfg.go and the processing we do for DI and JS. If not, we must still get the image name and mostly the tag from a common place, see: https://github.com/apache/incubator-kie-tools/blob/3933d90d67c8272813200b3c5615bf02a4ba49d0/packages/sonataflow-operator/internal/controller/platform/services/services.go#L141 ########## packages/sonataflow-operator/internal/controller/platform/db_migrator_job.go: ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package platform + +import ( + "context" + "errors" + "strconv" + "strings" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + + operatorapi "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/container-builder/client" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform/services" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log" +) + +type QuarkusDataSource struct { + JdbcUrl string + Username string + Password string + Schema string +} + +type DBMigratorJob struct { + MigrateDBDataIndex bool + DataIndexDataSource *QuarkusDataSource + MigrateDBJobsService bool + JobsServiceDataSource *QuarkusDataSource +} + +const ( + dbMigrationJobName = "sonataflow-db-migrator-job" + dbMigrationContainerName = "db-migration-container" + dbMigratorToolImage = "quay.io/rhkp/incubator-kie-kogito-service-db-migration-postgresql:latest" + dbMigrationCmd = "./migration.sh" + dbMigrationJobFailed = 1 + dbMigrationJobSucceeded = 1 +) + +type DBMigrationJobCfg struct { + JobName string + ContainerName string + ToolImageName string + MigrationCmd string +} + +func getDBSchemaName(persistencePostgreSQL *operatorapi.PersistencePostgreSQL, defaultSchemaName string) string { Review Comment: I think this method is not covering all cases well. Follow the PersistencePostgreSQL struct, and see: PostgreSQLServiceOptions type PostgreSQLServiceOptions struct { *SQLServiceOptions `json:",inline"` // Schema of postgresql database to be used. Defaults to "data-index-service" // +optional DatabaseSchema string `json:"databaseSchema,omitempty"` } Take a look at this also please: https://sonataflow.org/serverlessworkflow/main/cloud/operator/supporting-services.html#_configuring_the_supporting_services_persistence The services persistence configurations supports various configuration alternatives. ########## packages/sonataflow-operator/internal/controller/platform/db_migrator_job.go: ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package platform + +import ( + "context" + "errors" + "strconv" + "strings" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + + operatorapi "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/container-builder/client" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform/services" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log" +) + +type QuarkusDataSource struct { + JdbcUrl string + Username string + Password string + Schema string +} + +type DBMigratorJob struct { + MigrateDBDataIndex bool + DataIndexDataSource *QuarkusDataSource + MigrateDBJobsService bool + JobsServiceDataSource *QuarkusDataSource +} + +const ( + dbMigrationJobName = "sonataflow-db-migrator-job" + dbMigrationContainerName = "db-migration-container" + dbMigratorToolImage = "quay.io/rhkp/incubator-kie-kogito-service-db-migration-postgresql:latest" + dbMigrationCmd = "./migration.sh" + dbMigrationJobFailed = 1 + dbMigrationJobSucceeded = 1 +) + +type DBMigrationJobCfg struct { + JobName string + ContainerName string + ToolImageName string + MigrationCmd string +} + +func getDBSchemaName(persistencePostgreSQL *operatorapi.PersistencePostgreSQL, defaultSchemaName string) string { + jdbcURL := persistencePostgreSQL.JdbcUrl + + if len(jdbcURL) == 0 { + if persistencePostgreSQL.ServiceRef != nil && len(persistencePostgreSQL.ServiceRef.DatabaseSchema) > 0 { + return persistencePostgreSQL.ServiceRef.DatabaseSchema + } + } else { + _, a, found := strings.Cut(jdbcURL, "currentSchema=") + + if found { + if strings.Contains(a, "&") { + b, _, found := strings.Cut(a, "&") + if found { + return b + } + } else { + return a + } + } + } + return defaultSchemaName +} + +func getQuarkusDataSourceFromPersistence(ctx context.Context, platform *operatorapi.SonataFlowPlatform, persistence *operatorapi.PersistenceOptionsSpec, defaultSchemaName string) *QuarkusDataSource { + if persistence != nil && persistence.PostgreSQL != nil { + quarkusDataSource := &QuarkusDataSource{} + quarkusDataSource.JdbcUrl = persistence.PostgreSQL.JdbcUrl + quarkusDataSource.Username, _ = services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name, persistence.PostgreSQL.SecretRef.UserKey, platform.Namespace) + quarkusDataSource.Password, _ = services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name, persistence.PostgreSQL.SecretRef.PasswordKey, platform.Namespace) + quarkusDataSource.Schema = getDBSchemaName(persistence.PostgreSQL, defaultSchemaName) + return quarkusDataSource + } + + return nil +} + +func NewDBMigratorJobData(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, pshDI services.PlatformServiceHandler, pshJS services.PlatformServiceHandler) *DBMigratorJob { + + diJobsBasedDBMigration := false + jsJobsBasedDBMigration := false + + if pshDI.IsPersistenceEnabledtInSpec() { + diJobsBasedDBMigration = services.IsJobsBasedDBMigration(platform.Spec.Services.DataIndex.Persistence) + } + if pshJS.IsPersistenceEnabledtInSpec() { + jsJobsBasedDBMigration = services.IsJobsBasedDBMigration(platform.Spec.Services.JobService.Persistence) + } + + if (pshDI.IsServiceSetInSpec() && diJobsBasedDBMigration) || (pshJS.IsServiceSetInSpec() && jsJobsBasedDBMigration) { + quarkusDataSourceDataIndex := &QuarkusDataSource{} + quarkusDataSourceJobService := &QuarkusDataSource{} + + if diJobsBasedDBMigration { + quarkusDataSourceDataIndex = getQuarkusDataSourceFromPersistence(ctx, platform, platform.Spec.Services.DataIndex.Persistence, "defaultDi") + } + + if jsJobsBasedDBMigration { + quarkusDataSourceJobService = getQuarkusDataSourceFromPersistence(ctx, platform, platform.Spec.Services.JobService.Persistence, "defaultJs") + } + + return &DBMigratorJob{ + MigrateDBDataIndex: diJobsBasedDBMigration, + DataIndexDataSource: quarkusDataSourceDataIndex, + MigrateDBJobsService: jsJobsBasedDBMigration, + JobsServiceDataSource: quarkusDataSourceJobService, + } + } + return nil +} + +func getNewQuarkusDataSource(jdbcURL string, userName string, password string, schema string) *QuarkusDataSource { + return &QuarkusDataSource{ + JdbcUrl: jdbcURL, + Username: userName, + Password: password, + Schema: schema, + } +} + +// CreateJobDBMigration Creates DB Migration Job and returns to the caller +func (dbmj DBMigratorJob) CreateJobDBMigration(platform *operatorapi.SonataFlowPlatform) *batchv1.Job { + + diQuarkusDataSource := getNewQuarkusDataSource("", "", "", "") + jsQuarkusDataSource := getNewQuarkusDataSource("", "", "", "") + + if dbmj.DataIndexDataSource != nil { + diQuarkusDataSource.JdbcUrl = dbmj.DataIndexDataSource.JdbcUrl + diQuarkusDataSource.Username = dbmj.DataIndexDataSource.Username + diQuarkusDataSource.Password = dbmj.DataIndexDataSource.Password + diQuarkusDataSource.Schema = dbmj.DataIndexDataSource.Schema + } + + if dbmj.JobsServiceDataSource != nil { + jsQuarkusDataSource.JdbcUrl = dbmj.JobsServiceDataSource.JdbcUrl + jsQuarkusDataSource.Username = dbmj.JobsServiceDataSource.Username + jsQuarkusDataSource.Password = dbmj.JobsServiceDataSource.Password + jsQuarkusDataSource.Schema = dbmj.JobsServiceDataSource.Schema + } + + dbMigrationJobCfg := getDBMigrationJobCfg() + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: dbMigrationJobCfg.JobName, + Namespace: platform.Namespace, + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: dbMigrationJobCfg.ContainerName, + Image: dbMigrationJobCfg.ToolImageName, + Env: []corev1.EnvVar{ + { + Name: "MIGRATE_DB_DATAINDEX", + Value: strconv.FormatBool(dbmj.MigrateDBDataIndex), + }, + { + Name: "QUARKUS_DATASOURCE_DATAINDEX_JDBC_URL", + Value: diQuarkusDataSource.JdbcUrl, + }, + { + Name: "QUARKUS_DATASOURCE_DATAINDEX_USERNAME", + Value: diQuarkusDataSource.Username, + }, + { + Name: "QUARKUS_DATASOURCE_DATAINDEX_PASSWORD", + Value: diQuarkusDataSource.Password, + }, + { + Name: "QUARKUS_FLYWAY_DATAINDEX_SCHEMAS", + Value: diQuarkusDataSource.Schema, + }, + { + Name: "MIGRATE_DB_JOBSSERVICE", + Value: strconv.FormatBool(dbmj.MigrateDBJobsService), + }, + { + Name: "QUARKUS_DATASOURCE_JOBSSERVICE_JDBC_URL", + Value: jsQuarkusDataSource.JdbcUrl, + }, + { + Name: "QUARKUS_DATASOURCE_JOBSSERVICE_USERNAME", + Value: jsQuarkusDataSource.Username, + }, + { + Name: "QUARKUS_DATASOURCE_JOBSSERVICE_PASSWORD", + Value: jsQuarkusDataSource.Password, + }, + { + Name: "QUARKUS_FLYWAY_JOBSSERVICE_SCHEMAS", + Value: jsQuarkusDataSource.Schema, + }, + }, + Command: []string{ + dbMigrationJobCfg.MigrationCmd, + }, + }, + }, + RestartPolicy: "Never", + }, + }, + BackoffLimit: pointer.Int32(0), + }, + } + return job +} + +// GetDBMigrationJobStatus Returns db migration job status +func (dbmj DBMigratorJob) GetDBMigrationJobStatus(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform) (*batchv1.JobStatus, error) { + job, err := client.BatchV1().Jobs(platform.Namespace).Get(ctx, dbMigrationJobName, metav1.GetOptions{}) + if err != nil { + klog.V(log.E).InfoS("Error getting DB migrator job while monitoring completion: ", "error", err) + return nil, err + } + return &job.Status, nil +} + +// NewSonataFlowPlatformDBMigrationPhase Returns a new DB migration phase for SonataFlowPlatform +func NewSonataFlowPlatformDBMigrationPhase(status operatorapi.DBMigrationStatus, message string, reason string) *operatorapi.SonataFlowPlatformDBMigrationPhase { + return &operatorapi.SonataFlowPlatformDBMigrationPhase{ + Status: status, + Message: message, + Reason: reason, + } +} + +// UpdateSonataFlowPlatformDBMigrationPhase Updates a given SonataFlowPlatformDBMigrationPhase with the supplied values +func UpdateSonataFlowPlatformDBMigrationPhase(dbMigrationStatus *operatorapi.SonataFlowPlatformDBMigrationPhase, status operatorapi.DBMigrationStatus, message string, reason string) *operatorapi.SonataFlowPlatformDBMigrationPhase { + if dbMigrationStatus != nil { + dbMigrationStatus.Status = status + dbMigrationStatus.Message = message + dbMigrationStatus.Reason = reason + return dbMigrationStatus + } + return nil +} + +func getDBMigrationJobCfg() *DBMigrationJobCfg { Review Comment: newDBMigrationJobCfg instead? ########## packages/sonataflow-operator/internal/controller/platform/db_migrator_job.go: ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package platform + +import ( + "context" + "errors" + "strconv" + "strings" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + + operatorapi "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/container-builder/client" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform/services" + "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log" +) + +type QuarkusDataSource struct { + JdbcUrl string + Username string + Password string + Schema string +} + +type DBMigratorJob struct { + MigrateDBDataIndex bool + DataIndexDataSource *QuarkusDataSource + MigrateDBJobsService bool + JobsServiceDataSource *QuarkusDataSource +} + +const ( + dbMigrationJobName = "sonataflow-db-migrator-job" + dbMigrationContainerName = "db-migration-container" + dbMigratorToolImage = "quay.io/rhkp/incubator-kie-kogito-service-db-migration-postgresql:latest" + dbMigrationCmd = "./migration.sh" + dbMigrationJobFailed = 1 + dbMigrationJobSucceeded = 1 +) + +type DBMigrationJobCfg struct { + JobName string + ContainerName string + ToolImageName string + MigrationCmd string +} + +func getDBSchemaName(persistencePostgreSQL *operatorapi.PersistencePostgreSQL, defaultSchemaName string) string { + jdbcURL := persistencePostgreSQL.JdbcUrl + + if len(jdbcURL) == 0 { + if persistencePostgreSQL.ServiceRef != nil && len(persistencePostgreSQL.ServiceRef.DatabaseSchema) > 0 { + return persistencePostgreSQL.ServiceRef.DatabaseSchema + } + } else { + _, a, found := strings.Cut(jdbcURL, "currentSchema=") + + if found { + if strings.Contains(a, "&") { + b, _, found := strings.Cut(a, "&") + if found { + return b + } + } else { + return a + } + } + } + return defaultSchemaName +} + +func getQuarkusDataSourceFromPersistence(ctx context.Context, platform *operatorapi.SonataFlowPlatform, persistence *operatorapi.PersistenceOptionsSpec, defaultSchemaName string) *QuarkusDataSource { + if persistence != nil && persistence.PostgreSQL != nil { + quarkusDataSource := &QuarkusDataSource{} + quarkusDataSource.JdbcUrl = persistence.PostgreSQL.JdbcUrl + quarkusDataSource.Username, _ = services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name, persistence.PostgreSQL.SecretRef.UserKey, platform.Namespace) + quarkusDataSource.Password, _ = services.GetSecretKeyValueString(ctx, persistence.PostgreSQL.SecretRef.Name, persistence.PostgreSQL.SecretRef.PasswordKey, platform.Namespace) + quarkusDataSource.Schema = getDBSchemaName(persistence.PostgreSQL, defaultSchemaName) + return quarkusDataSource + } + + return nil +} + +func NewDBMigratorJobData(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, pshDI services.PlatformServiceHandler, pshJS services.PlatformServiceHandler) *DBMigratorJob { + + diJobsBasedDBMigration := false + jsJobsBasedDBMigration := false + + if pshDI.IsPersistenceEnabledtInSpec() { + diJobsBasedDBMigration = services.IsJobsBasedDBMigration(platform.Spec.Services.DataIndex.Persistence) + } + if pshJS.IsPersistenceEnabledtInSpec() { + jsJobsBasedDBMigration = services.IsJobsBasedDBMigration(platform.Spec.Services.JobService.Persistence) + } + + if (pshDI.IsServiceSetInSpec() && diJobsBasedDBMigration) || (pshJS.IsServiceSetInSpec() && jsJobsBasedDBMigration) { + quarkusDataSourceDataIndex := &QuarkusDataSource{} + quarkusDataSourceJobService := &QuarkusDataSource{} + + if diJobsBasedDBMigration { + quarkusDataSourceDataIndex = getQuarkusDataSourceFromPersistence(ctx, platform, platform.Spec.Services.DataIndex.Persistence, "defaultDi") + } + + if jsJobsBasedDBMigration { + quarkusDataSourceJobService = getQuarkusDataSourceFromPersistence(ctx, platform, platform.Spec.Services.JobService.Persistence, "defaultJs") + } + + return &DBMigratorJob{ + MigrateDBDataIndex: diJobsBasedDBMigration, + DataIndexDataSource: quarkusDataSourceDataIndex, + MigrateDBJobsService: jsJobsBasedDBMigration, + JobsServiceDataSource: quarkusDataSourceJobService, + } + } + return nil +} + +func getNewQuarkusDataSource(jdbcURL string, userName string, password string, schema string) *QuarkusDataSource { + return &QuarkusDataSource{ + JdbcUrl: jdbcURL, + Username: userName, + Password: password, + Schema: schema, + } +} + +// CreateJobDBMigration Creates DB Migration Job and returns to the caller +func (dbmj DBMigratorJob) CreateJobDBMigration(platform *operatorapi.SonataFlowPlatform) *batchv1.Job { + + diQuarkusDataSource := getNewQuarkusDataSource("", "", "", "") + jsQuarkusDataSource := getNewQuarkusDataSource("", "", "", "") + + if dbmj.DataIndexDataSource != nil { + diQuarkusDataSource.JdbcUrl = dbmj.DataIndexDataSource.JdbcUrl + diQuarkusDataSource.Username = dbmj.DataIndexDataSource.Username + diQuarkusDataSource.Password = dbmj.DataIndexDataSource.Password + diQuarkusDataSource.Schema = dbmj.DataIndexDataSource.Schema + } + + if dbmj.JobsServiceDataSource != nil { + jsQuarkusDataSource.JdbcUrl = dbmj.JobsServiceDataSource.JdbcUrl + jsQuarkusDataSource.Username = dbmj.JobsServiceDataSource.Username + jsQuarkusDataSource.Password = dbmj.JobsServiceDataSource.Password + jsQuarkusDataSource.Schema = dbmj.JobsServiceDataSource.Schema + } + + dbMigrationJobCfg := getDBMigrationJobCfg() + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: dbMigrationJobCfg.JobName, + Namespace: platform.Namespace, + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: dbMigrationJobCfg.ContainerName, + Image: dbMigrationJobCfg.ToolImageName, + Env: []corev1.EnvVar{ + { + Name: "MIGRATE_DB_DATAINDEX", Review Comment: I think we should use constants for this env var names. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
