jordigilh commented on code in PR #345: URL: https://github.com/apache/incubator-kie-kogito-serverless-operator/pull/345#discussion_r1462268278
########## controllers/clusterplatform/initialize.go: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package clusterplatform + +import ( + "context" + "fmt" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" +) + +// NewInitializeAction returns an action that initializes the platform configuration when not provided by the user. +func NewInitializeAction() Action { + return &initializeAction{} +} + +type initializeAction struct { + baseAction +} + +func (action *initializeAction) Name() string { + return "initialize" +} + +func (action *initializeAction) CanHandle(ctx context.Context, cPlatform *operatorapi.SonataFlowClusterPlatform) bool { + return !cPlatform.Status.IsDuplicated() || allDuplicatedClusterPlatforms(ctx, action.client) +} + +func (action *initializeAction) Handle(ctx context.Context, cPlatform *operatorapi.SonataFlowClusterPlatform) error { + duplicate, err := action.isPrimaryDuplicate(ctx, cPlatform) + if err != nil { + return err + } + if duplicate { + // another cluster platform already present + if !cPlatform.Status.IsDuplicated() { + cPlatform.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformDuplicatedReason, "") + } + return nil + } + cPlatform.Status.Version = metadata.SpecVersion + platformRef := cPlatform.Spec.PlatformRef + + // Check referenced platform status + platform := &operatorapi.SonataFlowPlatform{} + err = action.client.Get(ctx, types.NamespacedName{Namespace: platformRef.Namespace, Name: platformRef.Name}, platform) + if err != nil { + if k8serrors.IsNotFound(err) { + klog.V(log.D).InfoS("%s platform does not exist in %s namespace.", platformRef.Name, platformRef.Namespace) + cPlatform.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformNotFoundReason, + fmt.Sprintf("%s platform does not exist in %s namespace.", platformRef.Name, platformRef.Namespace)) + return nil + } + return err + } + + if platform != nil { + condition := platform.Status.GetTopLevelCondition() Review Comment: yeah, something like that, but if it's not required. Just a thought to make it more verbose for troubleshooting. ########## controllers/clusterplatform/clusterplatform.go: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package clusterplatform + +import ( + "context" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" +) + +// GetActiveClusterPlatform returns the currently installed active cluster platform. +func GetActiveClusterPlatform(ctx context.Context, c ctrl.Client) (*operatorapi.SonataFlowClusterPlatform, error) { + return getClusterPlatform(ctx, c, true) +} + +// getClusterPlatform returns the currently installed cluster platform or any platform existing in the cluster. +func getClusterPlatform(ctx context.Context, c ctrl.Client, active bool) (*operatorapi.SonataFlowClusterPlatform, error) { + klog.V(log.D).InfoS("Finding available cluster platforms") + + lst, err := listPrimaryClusterPlatforms(ctx, c) + if err != nil { + return nil, err + } + + for _, cPlatform := range lst.Items { + if IsActive(&cPlatform) { + klog.V(log.D).InfoS("Found active cluster platform", "platform", cPlatform.Name) + return &cPlatform, nil + } + } + + if !active && len(lst.Items) > 0 { + // does not require the cluster platform to be active, just return one if present + res := lst.Items[0] Review Comment: Ack ########## controllers/clusterplatform/clusterplatform.go: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package clusterplatform + +import ( + "context" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" +) + +// GetActiveClusterPlatform returns the currently installed active cluster platform. +func GetActiveClusterPlatform(ctx context.Context, c ctrl.Client) (*operatorapi.SonataFlowClusterPlatform, error) { + return getClusterPlatform(ctx, c, true) +} + +// getClusterPlatform returns the currently installed cluster platform or any platform existing in the cluster. +func getClusterPlatform(ctx context.Context, c ctrl.Client, active bool) (*operatorapi.SonataFlowClusterPlatform, error) { + klog.V(log.D).InfoS("Finding available cluster platforms") + + lst, err := listPrimaryClusterPlatforms(ctx, c) + if err != nil { + return nil, err + } + + for _, cPlatform := range lst.Items { + if IsActive(&cPlatform) { + klog.V(log.D).InfoS("Found active cluster platform", "platform", cPlatform.Name) + return &cPlatform, nil + } + } + + if !active && len(lst.Items) > 0 { + // does not require the cluster platform to be active, just return one if present + res := lst.Items[0] + klog.V(log.D).InfoS("Found cluster platform", "platform", res.Name) + return &res, nil + } + klog.V(log.I).InfoS("Not found a cluster platform") + return nil, nil +} + +// listPrimaryClusterPlatforms returns all non-secondary cluster platforms installed (only one will be active). +func listPrimaryClusterPlatforms(ctx context.Context, c ctrl.Reader) (*operatorapi.SonataFlowClusterPlatformList, error) { Review Comment: Ack ########## controllers/sonataflowclusterplatform_controller.go: ########## @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + clientr "github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/clusterplatform" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrlrun "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// SonataFlowClusterPlatformReconciler reconciles a SonataFlowClusterPlatform object +type SonataFlowClusterPlatformReconciler struct { + // This Client, initialized using mgr.Client() above, is a split Client + // that reads objects from the cache and writes to the API server + ctrl.Client + // Non-caching Client + Reader ctrl.Reader + Scheme *runtime.Scheme + Config *rest.Config + Recorder record.EventRecorder +} + +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/finalizers,verbs=update + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// TODO(user): Modify the Reconcile function to compare the state specified by +// the SonataFlowClusterPlatform object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile +func (r *SonataFlowClusterPlatformReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + + // Fetch the SonataFlowClusterPlatform instance + var instance operatorapi.SonataFlowClusterPlatform + + err := r.Client.Get(ctx, req.NamespacedName, &instance) + if err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + klog.V(log.E).ErrorS(err, "Failed to get SonataFlowClusterPlatform") + return reconcile.Result{}, err + } + + instance.Status.Manager().InitializeConditions() + + cli, _ := clientr.FromCtrlClientSchemeAndConfig(r.Client, r.Scheme, r.Config) + action := clusterplatform.NewInitializeAction() + action.InjectClient(cli) + klog.V(log.I).InfoS("Invoking action", "Name", action.Name()) + + target := instance.DeepCopy() + + if action.CanHandle(ctx, target) { + if err = action.Handle(ctx, target); err != nil { + target.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformFailureReason, err.Error()) + if err := r.Client.Status().Patch(ctx, target, ctrl.MergeFrom(&instance)); err != nil { + return reconcile.Result{}, err + } + r.Recorder.Event(&instance, corev1.EventTypeWarning, "Failed", fmt.Sprintf("Failed to update SonataFlowClusterPlaform: %s", err)) + return reconcile.Result{}, err + } + + if target != nil { + target.Status.ObservedGeneration = instance.Generation + + if err := r.Client.Status().Patch(ctx, target, ctrl.MergeFrom(&instance)); err != nil { + r.Recorder.Event(&instance, corev1.EventTypeNormal, "Status Updated", fmt.Sprintf("Updated cluster platform condition %s", instance.Status.GetTopLevelCondition())) + return reconcile.Result{}, err + } + } + + // handle one action at time so the resource + // is always at its latest state + r.Recorder.Event(&instance, corev1.EventTypeNormal, "Updated", fmt.Sprintf("Updated cluster platform condition to %s", instance.Status.GetTopLevelCondition())) + + if target != nil && target.Status.IsReady() { + return reconcile.Result{}, nil + } + + // Requeue + return reconcile.Result{ + RequeueAfter: 5 * time.Second, + }, nil + } + + return reconcile.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *SonataFlowClusterPlatformReconciler) SetupWithManager(mgr ctrlrun.Manager) error { + return ctrlrun.NewControllerManagedBy(mgr). + For(&operatorapi.SonataFlowClusterPlatform{}). + Watches(&operatorapi.SonataFlowPlatform{}, handler.EnqueueRequestsFromMapFunc(r.mapPlatformToClusterPlatformRequests)). + Watches(&operatorapi.SonataFlowClusterPlatform{}, handler.EnqueueRequestsFromMapFunc(r.mapClusterPlatformToClusterPlatformRequests)). + Complete(r) +} + +// if actively referenced sonataflowplatform object is changed, reconcile the active SonataFlowClusterPlatform. +func (r *SonataFlowClusterPlatformReconciler) mapPlatformToClusterPlatformRequests(ctx context.Context, object client.Object) []reconcile.Request { + platform := object.(*operatorapi.SonataFlowPlatform) + sfcPlatform, err := clusterplatform.GetActiveClusterPlatform(ctx, r.Client) + if err != nil && !errors.IsNotFound(err) { + klog.V(log.E).ErrorS(err, "Failed to get active SonataFlowClusterPlatform") + return nil + } + + if sfcPlatform != nil { + sfpcRefNsName := types.NamespacedName{Namespace: sfcPlatform.Spec.PlatformRef.Namespace, Name: sfcPlatform.Spec.PlatformRef.Name} + if client.ObjectKeyFromObject(platform) == sfpcRefNsName { + return []reconcile.Request{{NamespacedName: client.ObjectKeyFromObject(sfcPlatform)}} + } + } + return nil +} + +// if active sonataflowclusterplatform is changed, reconcile other SonataFlowClusterPlatforms. +func (r *SonataFlowClusterPlatformReconciler) mapClusterPlatformToClusterPlatformRequests(ctx context.Context, object client.Object) []reconcile.Request { + sfcPlatform := object.(*operatorapi.SonataFlowClusterPlatform) + var requests []reconcile.Request + if sfcPlatform != nil && clusterplatform.IsActive(sfcPlatform) { + var scpList operatorapi.SonataFlowClusterPlatformList + if err := r.List(ctx, &scpList); err != nil { Review Comment: ack. ########## controllers/clusterplatform/initialize.go: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package clusterplatform + +import ( + "context" + "fmt" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" +) + +// NewInitializeAction returns an action that initializes the platform configuration when not provided by the user. +func NewInitializeAction() Action { + return &initializeAction{} +} + +type initializeAction struct { + baseAction +} + +func (action *initializeAction) Name() string { + return "initialize" +} + +func (action *initializeAction) CanHandle(ctx context.Context, cPlatform *operatorapi.SonataFlowClusterPlatform) bool { + return !cPlatform.Status.IsDuplicated() || allDuplicatedClusterPlatforms(ctx, action.client) +} + +func (action *initializeAction) Handle(ctx context.Context, cPlatform *operatorapi.SonataFlowClusterPlatform) error { + duplicate, err := action.isPrimaryDuplicate(ctx, cPlatform) + if err != nil { + return err + } + if duplicate { + // another cluster platform already present + if !cPlatform.Status.IsDuplicated() { + cPlatform.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformDuplicatedReason, "") + } + return nil + } + cPlatform.Status.Version = metadata.SpecVersion + platformRef := cPlatform.Spec.PlatformRef + + // Check referenced platform status + platform := &operatorapi.SonataFlowPlatform{} + err = action.client.Get(ctx, types.NamespacedName{Namespace: platformRef.Namespace, Name: platformRef.Name}, platform) + if err != nil { + if k8serrors.IsNotFound(err) { + klog.V(log.D).InfoS("%s platform does not exist in %s namespace.", platformRef.Name, platformRef.Namespace) + cPlatform.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformNotFoundReason, + fmt.Sprintf("%s platform does not exist in %s namespace.", platformRef.Name, platformRef.Namespace)) + return nil + } + return err + } + + if platform != nil { Review Comment: Ack ########## controllers/clusterplatform/clusterplatform.go: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package clusterplatform + +import ( + "context" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" +) + +// GetActiveClusterPlatform returns the currently installed active cluster platform. +func GetActiveClusterPlatform(ctx context.Context, c ctrl.Client) (*operatorapi.SonataFlowClusterPlatform, error) { + return getClusterPlatform(ctx, c, true) +} + +// getClusterPlatform returns the currently installed cluster platform or any platform existing in the cluster. +func getClusterPlatform(ctx context.Context, c ctrl.Client, active bool) (*operatorapi.SonataFlowClusterPlatform, error) { + klog.V(log.D).InfoS("Finding available cluster platforms") + + lst, err := listPrimaryClusterPlatforms(ctx, c) + if err != nil { + return nil, err + } + + for _, cPlatform := range lst.Items { + if IsActive(&cPlatform) { + klog.V(log.D).InfoS("Found active cluster platform", "platform", cPlatform.Name) + return &cPlatform, nil + } + } + + if !active && len(lst.Items) > 0 { + // does not require the cluster platform to be active, just return one if present + res := lst.Items[0] + klog.V(log.D).InfoS("Found cluster platform", "platform", res.Name) + return &res, nil + } + klog.V(log.I).InfoS("Not found a cluster platform") + return nil, nil +} + +// listPrimaryClusterPlatforms returns all non-secondary cluster platforms installed (only one will be active). +func listPrimaryClusterPlatforms(ctx context.Context, c ctrl.Reader) (*operatorapi.SonataFlowClusterPlatformList, error) { + lst, err := listAllClusterPlatforms(ctx, c) + if err != nil { + return nil, err + } + + filtered := &operatorapi.SonataFlowClusterPlatformList{} + for i := range lst.Items { + cPl := lst.Items[i] + if !IsSecondary(&cPl) { + filtered.Items = append(filtered.Items, cPl) + } + } + return filtered, nil +} + +// allDuplicatedClusterPlatforms returns true if every cluster platform has a "Duplicated" status set +func allDuplicatedClusterPlatforms(ctx context.Context, c ctrl.Reader) bool { + lst, err := listAllClusterPlatforms(ctx, c) + if err != nil { + return false + } + + for i := range lst.Items { + if !lst.Items[i].Status.IsDuplicated() { + return false + } + } + + return true +} + +// listAllClusterPlatforms returns all clusterplatforms installed. +func listAllClusterPlatforms(ctx context.Context, c ctrl.Reader) (*operatorapi.SonataFlowClusterPlatformList, error) { + lst := operatorapi.NewSonataFlowClusterPlatformList() + if err := c.List(ctx, &lst); err != nil { + return nil, err + } + return &lst, nil +} + +// IsActive determines if the given cluster platform is being used. +func IsActive(p *operatorapi.SonataFlowClusterPlatform) bool { + return p.Status.IsReady() && !p.Status.IsDuplicated() +} + +// IsSecondary determines if the given cluster platform is marked as secondary. Review Comment: Ack ########## controllers/sonataflowclusterplatform_controller.go: ########## @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + clientr "github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/clusterplatform" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrlrun "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// SonataFlowClusterPlatformReconciler reconciles a SonataFlowClusterPlatform object +type SonataFlowClusterPlatformReconciler struct { + // This Client, initialized using mgr.Client() above, is a split Client + // that reads objects from the cache and writes to the API server + ctrl.Client + // Non-caching Client + Reader ctrl.Reader + Scheme *runtime.Scheme + Config *rest.Config + Recorder record.EventRecorder +} + +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/finalizers,verbs=update + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// TODO(user): Modify the Reconcile function to compare the state specified by +// the SonataFlowClusterPlatform object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile +func (r *SonataFlowClusterPlatformReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + + // Fetch the SonataFlowClusterPlatform instance + var instance operatorapi.SonataFlowClusterPlatform + + err := r.Client.Get(ctx, req.NamespacedName, &instance) + if err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + klog.V(log.E).ErrorS(err, "Failed to get SonataFlowClusterPlatform") + return reconcile.Result{}, err + } + + instance.Status.Manager().InitializeConditions() + + cli, _ := clientr.FromCtrlClientSchemeAndConfig(r.Client, r.Scheme, r.Config) + action := clusterplatform.NewInitializeAction() + action.InjectClient(cli) + klog.V(log.I).InfoS("Invoking action", "Name", action.Name()) + + target := instance.DeepCopy() + + if action.CanHandle(ctx, target) { + if err = action.Handle(ctx, target); err != nil { + target.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformFailureReason, err.Error()) + if err := r.Client.Status().Patch(ctx, target, ctrl.MergeFrom(&instance)); err != nil { + return reconcile.Result{}, err + } + r.Recorder.Event(&instance, corev1.EventTypeWarning, "Failed", fmt.Sprintf("Failed to update SonataFlowClusterPlaform: %s", err)) + return reconcile.Result{}, err + } + + if target != nil { + target.Status.ObservedGeneration = instance.Generation + + if err := r.Client.Status().Patch(ctx, target, ctrl.MergeFrom(&instance)); err != nil { + r.Recorder.Event(&instance, corev1.EventTypeNormal, "Status Updated", fmt.Sprintf("Updated cluster platform condition %s", instance.Status.GetTopLevelCondition())) + return reconcile.Result{}, err + } + } + + // handle one action at time so the resource + // is always at its latest state + r.Recorder.Event(&instance, corev1.EventTypeNormal, "Updated", fmt.Sprintf("Updated cluster platform condition to %s", instance.Status.GetTopLevelCondition())) + + if target != nil && target.Status.IsReady() { + return reconcile.Result{}, nil + } + + // Requeue + return reconcile.Result{ + RequeueAfter: 5 * time.Second, + }, nil + } + + return reconcile.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *SonataFlowClusterPlatformReconciler) SetupWithManager(mgr ctrlrun.Manager) error { + return ctrlrun.NewControllerManagedBy(mgr). + For(&operatorapi.SonataFlowClusterPlatform{}). + Watches(&operatorapi.SonataFlowPlatform{}, handler.EnqueueRequestsFromMapFunc(r.mapPlatformToClusterPlatformRequests)). + Watches(&operatorapi.SonataFlowClusterPlatform{}, handler.EnqueueRequestsFromMapFunc(r.mapClusterPlatformToClusterPlatformRequests)). + Complete(r) +} + +// if actively referenced sonataflowplatform object is changed, reconcile the active SonataFlowClusterPlatform. +func (r *SonataFlowClusterPlatformReconciler) mapPlatformToClusterPlatformRequests(ctx context.Context, object client.Object) []reconcile.Request { + platform := object.(*operatorapi.SonataFlowPlatform) + sfcPlatform, err := clusterplatform.GetActiveClusterPlatform(ctx, r.Client) + if err != nil && !errors.IsNotFound(err) { + klog.V(log.E).ErrorS(err, "Failed to get active SonataFlowClusterPlatform") Review Comment: ack ########## controllers/sonataflowclusterplatform_controller.go: ########## @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + clientr "github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/clusterplatform" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrlrun "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// SonataFlowClusterPlatformReconciler reconciles a SonataFlowClusterPlatform object +type SonataFlowClusterPlatformReconciler struct { + // This Client, initialized using mgr.Client() above, is a split Client + // that reads objects from the cache and writes to the API server + ctrl.Client + // Non-caching Client + Reader ctrl.Reader + Scheme *runtime.Scheme + Config *rest.Config + Recorder record.EventRecorder +} + +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/finalizers,verbs=update + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// TODO(user): Modify the Reconcile function to compare the state specified by +// the SonataFlowClusterPlatform object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile +func (r *SonataFlowClusterPlatformReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + + // Fetch the SonataFlowClusterPlatform instance + var instance operatorapi.SonataFlowClusterPlatform + + err := r.Client.Get(ctx, req.NamespacedName, &instance) + if err != nil { + if errors.IsNotFound(err) { Review Comment: ack ########## controllers/platform/services/services.go: ########## @@ -271,6 +337,56 @@ func (j JobService) GetServiceCmName() string { return fmt.Sprintf("%s-props", j.GetServiceName()) } +func (j JobService) SetServiceUrlInStatus(clusterRefPlatform *operatorapi.SonataFlowPlatform) { + psJS := NewJobService(clusterRefPlatform) + if !isServicesSet(j.platform) && psJS.IsServiceEnabledInSpec() { Review Comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
