jordigilh commented on code in PR #345: URL: https://github.com/apache/incubator-kie-kogito-serverless-operator/pull/345#discussion_r1456433691
########## api/v1alpha08/sonataflowclusterplatform_types.go: ########## @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v1alpha08 + +import ( + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + // SonataFlowClusterPlatformKind is the Kind name of the SonataFlowClusterPlatform CR + SonataFlowClusterPlatformKind string = "SonataFlowClusterPlatform" + PlatformNotFoundReason string = "PlatformNotFound" +) + +// SonataFlowClusterPlatformSpec defines the desired state of SonataFlowClusterPlatform +type SonataFlowClusterPlatformSpec struct { + // PlatformRef defines which existing SonataFlowPlatform's services should be used cluster-wide. + PlatformRef SonataFlowPlatformRef `json:"platformRef"` +} + +// SonataFlowPlatformRef defines which existing SonataFlowPlatform should be used cluster-wide +type SonataFlowPlatformRef struct { + // Name of the SonataFlowPlatform + //+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Platform_Name" + Name string `json:"name"` Review Comment: How many platform CRs can we have in a given namespace? If only 1 then there is no need here for this field. ########## controllers/clusterplatform/clusterplatform.go: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package clusterplatform + +import ( + "context" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" +) + +// GetActiveClusterPlatform returns the currently installed active cluster platform. +func GetActiveClusterPlatform(ctx context.Context, c ctrl.Client) (*operatorapi.SonataFlowClusterPlatform, error) { + return getClusterPlatform(ctx, c, true) +} + +// getClusterPlatform returns the currently installed cluster platform or any platform existing in the cluster. +func getClusterPlatform(ctx context.Context, c ctrl.Client, active bool) (*operatorapi.SonataFlowClusterPlatform, error) { + klog.V(log.D).InfoS("Finding available cluster platforms") + + lst, err := listPrimaryClusterPlatforms(ctx, c) + if err != nil { + return nil, err + } + + for _, cPlatform := range lst.Items { + if IsActive(&cPlatform) { + klog.V(log.D).InfoS("Found active cluster platform", "platform", cPlatform.Name) + return &cPlatform, nil + } + } + + if !active && len(lst.Items) > 0 { + // does not require the cluster platform to be active, just return one if present + res := lst.Items[0] + klog.V(log.D).InfoS("Found cluster platform", "platform", res.Name) + return &res, nil + } + klog.V(log.I).InfoS("Not found a cluster platform") Review Comment: `s/Not found a cluster platform/No cluster platform found` ? ########## api/v1alpha08/sonataflowplatform_types.go: ########## @@ -79,6 +79,35 @@ type SonataFlowPlatformStatus struct { // Info generic information related to the build //+operator-sdk:csv:customresourcedefinitions:type=status,displayName="info" Info map[string]string `json:"info,omitempty"` + // ClusterPlatformRef information related to the (optional) active SonataFlowClusterPlatform + ClusterPlatformRef *SonataFlowClusterPlatformRef `json:"clusterPlatformRef,omitempty"` +} + +// SonataFlowClusterPlatformRef information related to the (optional) active SonataFlowClusterPlatform +// +k8s:openapi-gen=true +type SonataFlowClusterPlatformRef struct { + // Name of the active SonataFlowClusterPlatform + Name string `json:"name,omitempty"` + // PlatformRef displays which SonataFlowPlatform has been referenced by the active SonataFlowClusterPlatform + PlatformRef SonataFlowPlatformRef `json:"platformRef,omitempty"` + // Services displays which cluster-wide services are being used by this SonataFlowPlatform + Services *PlatformServices `json:"services,omitempty"` +} + +// PlatformServices displays which cluster-wide services are being used by a SonataFlowPlatform +// +k8s:openapi-gen=true +type PlatformServices struct { + // DataIndexRef displays information on the cluster-wide Data Index service + DataIndexRef *PlatformServiceRef `json:"dataIndexRef,omitempty"` + // JobServiceRef displays information on the cluster-wide Job Service + JobServiceRef *PlatformServiceRef `json:"jobServiceRef,omitempty"` +} + +// PlatformServiceRef displays information on a cluster-wide service +// +k8s:openapi-gen=true +type PlatformServiceRef struct { + // Url displays the base url of a cluster-wide service + Url string `json:"url,omitempty"` Review Comment: I would be cautious of using an unstructured string to capture configuration, it could lead to unexpected scenarios with malformed parsed values or even, such is in this case, where the value could lead to an endpoint outside the cluster. Unless there is a strong need to have this kind of wildcard values I'd avoid them for now. ########## controllers/sonataflowclusterplatform_controller.go: ########## @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + clientr "github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/clusterplatform" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrlrun "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// SonataFlowClusterPlatformReconciler reconciles a SonataFlowClusterPlatform object +type SonataFlowClusterPlatformReconciler struct { + // This Client, initialized using mgr.Client() above, is a split Client + // that reads objects from the cache and writes to the API server + ctrl.Client + // Non-caching Client + Reader ctrl.Reader + Scheme *runtime.Scheme + Config *rest.Config + Recorder record.EventRecorder +} + +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/finalizers,verbs=update + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// TODO(user): Modify the Reconcile function to compare the state specified by +// the SonataFlowClusterPlatform object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile +func (r *SonataFlowClusterPlatformReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + + // Fetch the SonataFlowClusterPlatform instance + var instance operatorapi.SonataFlowClusterPlatform + + err := r.Client.Get(ctx, req.NamespacedName, &instance) + if err != nil { + if errors.IsNotFound(err) { Review Comment: If the object is not found, shouldn't that mean that it was a delete event that triggered the reconciliation and the platform CRs that refer to it need to be updated removing the reference to the cluster platform CR? ########## controllers/clusterplatform/clusterplatform.go: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package clusterplatform + +import ( + "context" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" +) + +// GetActiveClusterPlatform returns the currently installed active cluster platform. +func GetActiveClusterPlatform(ctx context.Context, c ctrl.Client) (*operatorapi.SonataFlowClusterPlatform, error) { + return getClusterPlatform(ctx, c, true) +} + +// getClusterPlatform returns the currently installed cluster platform or any platform existing in the cluster. +func getClusterPlatform(ctx context.Context, c ctrl.Client, active bool) (*operatorapi.SonataFlowClusterPlatform, error) { + klog.V(log.D).InfoS("Finding available cluster platforms") + + lst, err := listPrimaryClusterPlatforms(ctx, c) + if err != nil { + return nil, err + } + + for _, cPlatform := range lst.Items { + if IsActive(&cPlatform) { + klog.V(log.D).InfoS("Found active cluster platform", "platform", cPlatform.Name) + return &cPlatform, nil + } + } + + if !active && len(lst.Items) > 0 { + // does not require the cluster platform to be active, just return one if present + res := lst.Items[0] Review Comment: This behavior is non-deterministic as the function can return different objects depending on how the list is received from the APIServer (returning the first item in the list without any criteria). ########## controllers/clusterplatform/clusterplatform.go: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package clusterplatform + +import ( + "context" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" +) + +// GetActiveClusterPlatform returns the currently installed active cluster platform. +func GetActiveClusterPlatform(ctx context.Context, c ctrl.Client) (*operatorapi.SonataFlowClusterPlatform, error) { + return getClusterPlatform(ctx, c, true) +} + +// getClusterPlatform returns the currently installed cluster platform or any platform existing in the cluster. +func getClusterPlatform(ctx context.Context, c ctrl.Client, active bool) (*operatorapi.SonataFlowClusterPlatform, error) { + klog.V(log.D).InfoS("Finding available cluster platforms") + + lst, err := listPrimaryClusterPlatforms(ctx, c) + if err != nil { + return nil, err + } + + for _, cPlatform := range lst.Items { + if IsActive(&cPlatform) { + klog.V(log.D).InfoS("Found active cluster platform", "platform", cPlatform.Name) + return &cPlatform, nil + } + } + + if !active && len(lst.Items) > 0 { + // does not require the cluster platform to be active, just return one if present + res := lst.Items[0] + klog.V(log.D).InfoS("Found cluster platform", "platform", res.Name) + return &res, nil + } + klog.V(log.I).InfoS("Not found a cluster platform") + return nil, nil +} + +// listPrimaryClusterPlatforms returns all non-secondary cluster platforms installed (only one will be active). +func listPrimaryClusterPlatforms(ctx context.Context, c ctrl.Reader) (*operatorapi.SonataFlowClusterPlatformList, error) { + lst, err := listAllClusterPlatforms(ctx, c) + if err != nil { + return nil, err + } + + filtered := &operatorapi.SonataFlowClusterPlatformList{} + for i := range lst.Items { + cPl := lst.Items[i] + if !IsSecondary(&cPl) { + filtered.Items = append(filtered.Items, cPl) + } + } + return filtered, nil +} + +// allDuplicatedClusterPlatforms returns true if every cluster platform has a "Duplicated" status set +func allDuplicatedClusterPlatforms(ctx context.Context, c ctrl.Reader) bool { + lst, err := listAllClusterPlatforms(ctx, c) + if err != nil { + return false + } + + for i := range lst.Items { + if !lst.Items[i].Status.IsDuplicated() { + return false + } + } + + return true +} + +// listAllClusterPlatforms returns all clusterplatforms installed. +func listAllClusterPlatforms(ctx context.Context, c ctrl.Reader) (*operatorapi.SonataFlowClusterPlatformList, error) { + lst := operatorapi.NewSonataFlowClusterPlatformList() + if err := c.List(ctx, &lst); err != nil { + return nil, err + } + return &lst, nil +} + +// IsActive determines if the given cluster platform is being used. +func IsActive(p *operatorapi.SonataFlowClusterPlatform) bool { + return p.Status.IsReady() && !p.Status.IsDuplicated() +} + +// IsSecondary determines if the given cluster platform is marked as secondary. Review Comment: Can you share some extra insight as in which scenario makes sense to have a secondary CR? I'm not clear where this coupled with the idea of duplicated CRs could help the user manage the platform services in the cluster. ########## controllers/clusterplatform/clusterplatform.go: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package clusterplatform + +import ( + "context" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" +) + +// GetActiveClusterPlatform returns the currently installed active cluster platform. +func GetActiveClusterPlatform(ctx context.Context, c ctrl.Client) (*operatorapi.SonataFlowClusterPlatform, error) { + return getClusterPlatform(ctx, c, true) Review Comment: What is the benefit from a user perspective to have an active/inactive CR when inactive is not being used by the cluster as reference?. Keep in mind that we'll be storing unused objects in etcd that don't impact the cluster for as long as they're inactive and only take space. ########## controllers/clusterplatform/initialize.go: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package clusterplatform + +import ( + "context" + "fmt" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" +) + +// NewInitializeAction returns an action that initializes the platform configuration when not provided by the user. +func NewInitializeAction() Action { + return &initializeAction{} +} + +type initializeAction struct { + baseAction +} + +func (action *initializeAction) Name() string { + return "initialize" +} + +func (action *initializeAction) CanHandle(ctx context.Context, cPlatform *operatorapi.SonataFlowClusterPlatform) bool { + return !cPlatform.Status.IsDuplicated() || allDuplicatedClusterPlatforms(ctx, action.client) +} + +func (action *initializeAction) Handle(ctx context.Context, cPlatform *operatorapi.SonataFlowClusterPlatform) error { + duplicate, err := action.isPrimaryDuplicate(ctx, cPlatform) + if err != nil { + return err + } + if duplicate { + // another cluster platform already present + if !cPlatform.Status.IsDuplicated() { + cPlatform.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformDuplicatedReason, "") + } + return nil + } + cPlatform.Status.Version = metadata.SpecVersion + platformRef := cPlatform.Spec.PlatformRef + + // Check referenced platform status + platform := &operatorapi.SonataFlowPlatform{} + err = action.client.Get(ctx, types.NamespacedName{Namespace: platformRef.Namespace, Name: platformRef.Name}, platform) + if err != nil { + if k8serrors.IsNotFound(err) { + klog.V(log.D).InfoS("%s platform does not exist in %s namespace.", platformRef.Name, platformRef.Namespace) + cPlatform.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformNotFoundReason, + fmt.Sprintf("%s platform does not exist in %s namespace.", platformRef.Name, platformRef.Namespace)) + return nil + } + return err + } + + if platform != nil { + condition := platform.Status.GetTopLevelCondition() Review Comment: shouldn't it also check if the referenced platform has at least one service defined and set it as part of the status? This can help the user to know what services are expected to be available to other namespaces from the cluster platform. WDYT? ########## controllers/clusterplatform/clusterplatform.go: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package clusterplatform + +import ( + "context" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" +) + +// GetActiveClusterPlatform returns the currently installed active cluster platform. +func GetActiveClusterPlatform(ctx context.Context, c ctrl.Client) (*operatorapi.SonataFlowClusterPlatform, error) { + return getClusterPlatform(ctx, c, true) +} + +// getClusterPlatform returns the currently installed cluster platform or any platform existing in the cluster. +func getClusterPlatform(ctx context.Context, c ctrl.Client, active bool) (*operatorapi.SonataFlowClusterPlatform, error) { + klog.V(log.D).InfoS("Finding available cluster platforms") + + lst, err := listPrimaryClusterPlatforms(ctx, c) + if err != nil { + return nil, err + } + + for _, cPlatform := range lst.Items { + if IsActive(&cPlatform) { + klog.V(log.D).InfoS("Found active cluster platform", "platform", cPlatform.Name) + return &cPlatform, nil + } + } + + if !active && len(lst.Items) > 0 { + // does not require the cluster platform to be active, just return one if present + res := lst.Items[0] + klog.V(log.D).InfoS("Found cluster platform", "platform", res.Name) + return &res, nil + } + klog.V(log.I).InfoS("Not found a cluster platform") + return nil, nil Review Comment: shouldn't it return an error to say that no cluster platform is found? similar to when you `get` an object from k8s and none matches or is found and you get an error of type `IsNotFound`? After all it's expected to return at least one CR from what I understand based on the description of the function. ########## controllers/clusterplatform/clusterplatform.go: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package clusterplatform + +import ( + "context" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" +) + +// GetActiveClusterPlatform returns the currently installed active cluster platform. +func GetActiveClusterPlatform(ctx context.Context, c ctrl.Client) (*operatorapi.SonataFlowClusterPlatform, error) { + return getClusterPlatform(ctx, c, true) +} + +// getClusterPlatform returns the currently installed cluster platform or any platform existing in the cluster. Review Comment: nit: `currently installed cluster platform` to `currently active cluster platform` ########## controllers/platform/k8s.go: ########## @@ -62,15 +61,19 @@ func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.S return nil, err } - if platform.Spec.Services.DataIndex != nil { - if err := createServiceComponents(ctx, action.client, platform, services.NewDataIndexService(platform)); err != nil { - return nil, err + if platform.Spec.Services != nil { + psDI := services.NewDataIndexService(platform) + if psDI.IsServiceSetInSpec() { + if err := createServiceComponents(ctx, action.client, platform, psDI); err != nil { Review Comment: shouldn't it reconcile the deployment rather than create a new one? ########## controllers/sonataflowclusterplatform_controller.go: ########## @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + clientr "github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/clusterplatform" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrlrun "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// SonataFlowClusterPlatformReconciler reconciles a SonataFlowClusterPlatform object +type SonataFlowClusterPlatformReconciler struct { + // This Client, initialized using mgr.Client() above, is a split Client + // that reads objects from the cache and writes to the API server + ctrl.Client + // Non-caching Client + Reader ctrl.Reader + Scheme *runtime.Scheme + Config *rest.Config + Recorder record.EventRecorder +} + +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/finalizers,verbs=update + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// TODO(user): Modify the Reconcile function to compare the state specified by +// the SonataFlowClusterPlatform object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile +func (r *SonataFlowClusterPlatformReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + + // Fetch the SonataFlowClusterPlatform instance + var instance operatorapi.SonataFlowClusterPlatform + + err := r.Client.Get(ctx, req.NamespacedName, &instance) + if err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + klog.V(log.E).ErrorS(err, "Failed to get SonataFlowClusterPlatform") + return reconcile.Result{}, err + } + + instance.Status.Manager().InitializeConditions() + + cli, _ := clientr.FromCtrlClientSchemeAndConfig(r.Client, r.Scheme, r.Config) + action := clusterplatform.NewInitializeAction() + action.InjectClient(cli) + klog.V(log.I).InfoS("Invoking action", "Name", action.Name()) + + target := instance.DeepCopy() + + if action.CanHandle(ctx, target) { + if err = action.Handle(ctx, target); err != nil { + target.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformFailureReason, err.Error()) + if err := r.Client.Status().Patch(ctx, target, ctrl.MergeFrom(&instance)); err != nil { + return reconcile.Result{}, err + } + r.Recorder.Event(&instance, corev1.EventTypeWarning, "Failed", fmt.Sprintf("Failed to update SonataFlowClusterPlaform: %s", err)) + return reconcile.Result{}, err + } + + if target != nil { + target.Status.ObservedGeneration = instance.Generation + + if err := r.Client.Status().Patch(ctx, target, ctrl.MergeFrom(&instance)); err != nil { + r.Recorder.Event(&instance, corev1.EventTypeNormal, "Status Updated", fmt.Sprintf("Updated cluster platform condition %s", instance.Status.GetTopLevelCondition())) + return reconcile.Result{}, err + } + } + + // handle one action at time so the resource + // is always at its latest state + r.Recorder.Event(&instance, corev1.EventTypeNormal, "Updated", fmt.Sprintf("Updated cluster platform condition to %s", instance.Status.GetTopLevelCondition())) + + if target != nil && target.Status.IsReady() { + return reconcile.Result{}, nil + } + + // Requeue + return reconcile.Result{ + RequeueAfter: 5 * time.Second, + }, nil + } + + return reconcile.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *SonataFlowClusterPlatformReconciler) SetupWithManager(mgr ctrlrun.Manager) error { + return ctrlrun.NewControllerManagedBy(mgr). + For(&operatorapi.SonataFlowClusterPlatform{}). + Watches(&operatorapi.SonataFlowPlatform{}, handler.EnqueueRequestsFromMapFunc(r.mapPlatformToClusterPlatformRequests)). + Watches(&operatorapi.SonataFlowClusterPlatform{}, handler.EnqueueRequestsFromMapFunc(r.mapClusterPlatformToClusterPlatformRequests)). + Complete(r) +} + +// if actively referenced sonataflowplatform object is changed, reconcile the active SonataFlowClusterPlatform. +func (r *SonataFlowClusterPlatformReconciler) mapPlatformToClusterPlatformRequests(ctx context.Context, object client.Object) []reconcile.Request { + platform := object.(*operatorapi.SonataFlowPlatform) Review Comment: this variable is not needed, you can pass the object itself directly: ``` if client.ObjectKeyFromObject(object) == sfpcRefNsName { ``` ########## controllers/clusterplatform/initialize.go: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package clusterplatform + +import ( + "context" + "fmt" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" +) + +// NewInitializeAction returns an action that initializes the platform configuration when not provided by the user. +func NewInitializeAction() Action { + return &initializeAction{} +} + +type initializeAction struct { + baseAction +} + +func (action *initializeAction) Name() string { + return "initialize" +} + +func (action *initializeAction) CanHandle(ctx context.Context, cPlatform *operatorapi.SonataFlowClusterPlatform) bool { + return !cPlatform.Status.IsDuplicated() || allDuplicatedClusterPlatforms(ctx, action.client) +} + +func (action *initializeAction) Handle(ctx context.Context, cPlatform *operatorapi.SonataFlowClusterPlatform) error { + duplicate, err := action.isPrimaryDuplicate(ctx, cPlatform) + if err != nil { + return err + } + if duplicate { + // another cluster platform already present + if !cPlatform.Status.IsDuplicated() { + cPlatform.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformDuplicatedReason, "") + } + return nil + } + cPlatform.Status.Version = metadata.SpecVersion + platformRef := cPlatform.Spec.PlatformRef + + // Check referenced platform status + platform := &operatorapi.SonataFlowPlatform{} + err = action.client.Get(ctx, types.NamespacedName{Namespace: platformRef.Namespace, Name: platformRef.Name}, platform) + if err != nil { + if k8serrors.IsNotFound(err) { + klog.V(log.D).InfoS("%s platform does not exist in %s namespace.", platformRef.Name, platformRef.Namespace) + cPlatform.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformNotFoundReason, + fmt.Sprintf("%s platform does not exist in %s namespace.", platformRef.Name, platformRef.Namespace)) + return nil + } + return err + } + + if platform != nil { Review Comment: It would also be helpful for the user to know that the services are reachable between namespaces belonging to the same cluster. It's a check that can help troubleshoot misconfigured environments. ########## controllers/platform/services/services.go: ########## @@ -271,6 +337,56 @@ func (j JobService) GetServiceCmName() string { return fmt.Sprintf("%s-props", j.GetServiceName()) } +func (j JobService) SetServiceUrlInStatus(clusterRefPlatform *operatorapi.SonataFlowPlatform) { + psJS := NewJobService(clusterRefPlatform) + if !isServicesSet(j.platform) && psJS.IsServiceEnabledInSpec() { Review Comment: What about the configmap application.properties for the services? shouldn't that be reconciled as well? Removing the data index service will impact the job service application properties. ########## controllers/platform/k8s.go: ########## @@ -62,15 +61,19 @@ func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.S return nil, err } - if platform.Spec.Services.DataIndex != nil { - if err := createServiceComponents(ctx, action.client, platform, services.NewDataIndexService(platform)); err != nil { - return nil, err + if platform.Spec.Services != nil { + psDI := services.NewDataIndexService(platform) + if psDI.IsServiceSetInSpec() { + if err := createServiceComponents(ctx, action.client, platform, psDI); err != nil { + return nil, err + } } - } - if platform.Spec.Services.JobService != nil { - if err := createServiceComponents(ctx, action.client, platform, services.NewJobService(platform)); err != nil { - return nil, err + psJS := services.NewJobService(platform) + if psJS.IsServiceSetInSpec() { + if err := createServiceComponents(ctx, action.client, platform, psJS); err != nil { Review Comment: same here, the JS service spec could have changed and that would mean changes to the deployment rather than create a new one. ########## controllers/clusterplatform/clusterplatform.go: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package clusterplatform + +import ( + "context" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" +) + +// GetActiveClusterPlatform returns the currently installed active cluster platform. +func GetActiveClusterPlatform(ctx context.Context, c ctrl.Client) (*operatorapi.SonataFlowClusterPlatform, error) { + return getClusterPlatform(ctx, c, true) +} + +// getClusterPlatform returns the currently installed cluster platform or any platform existing in the cluster. +func getClusterPlatform(ctx context.Context, c ctrl.Client, active bool) (*operatorapi.SonataFlowClusterPlatform, error) { + klog.V(log.D).InfoS("Finding available cluster platforms") + + lst, err := listPrimaryClusterPlatforms(ctx, c) + if err != nil { + return nil, err + } + + for _, cPlatform := range lst.Items { + if IsActive(&cPlatform) { + klog.V(log.D).InfoS("Found active cluster platform", "platform", cPlatform.Name) + return &cPlatform, nil + } + } + + if !active && len(lst.Items) > 0 { + // does not require the cluster platform to be active, just return one if present + res := lst.Items[0] + klog.V(log.D).InfoS("Found cluster platform", "platform", res.Name) + return &res, nil + } + klog.V(log.I).InfoS("Not found a cluster platform") + return nil, nil +} + +// listPrimaryClusterPlatforms returns all non-secondary cluster platforms installed (only one will be active). +func listPrimaryClusterPlatforms(ctx context.Context, c ctrl.Reader) (*operatorapi.SonataFlowClusterPlatformList, error) { Review Comment: Curious why you use the `operatorapi.SonataFlowClusterPlatformList` instead of a simple slice `[]operatorapi.SonataFlowClusterPlatform`. The `SonataFlowClusterPlatformList` struct contains a TypeMeta and ObjectMeta fields that you don't seem to use later on. ########## controllers/sonataflowclusterplatform_controller.go: ########## @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + clientr "github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/clusterplatform" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrlrun "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// SonataFlowClusterPlatformReconciler reconciles a SonataFlowClusterPlatform object +type SonataFlowClusterPlatformReconciler struct { + // This Client, initialized using mgr.Client() above, is a split Client + // that reads objects from the cache and writes to the API server + ctrl.Client + // Non-caching Client + Reader ctrl.Reader + Scheme *runtime.Scheme + Config *rest.Config + Recorder record.EventRecorder +} + +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/finalizers,verbs=update + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// TODO(user): Modify the Reconcile function to compare the state specified by +// the SonataFlowClusterPlatform object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile +func (r *SonataFlowClusterPlatformReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + + // Fetch the SonataFlowClusterPlatform instance + var instance operatorapi.SonataFlowClusterPlatform + + err := r.Client.Get(ctx, req.NamespacedName, &instance) + if err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + klog.V(log.E).ErrorS(err, "Failed to get SonataFlowClusterPlatform") + return reconcile.Result{}, err + } + + instance.Status.Manager().InitializeConditions() + + cli, _ := clientr.FromCtrlClientSchemeAndConfig(r.Client, r.Scheme, r.Config) + action := clusterplatform.NewInitializeAction() + action.InjectClient(cli) + klog.V(log.I).InfoS("Invoking action", "Name", action.Name()) + + target := instance.DeepCopy() + + if action.CanHandle(ctx, target) { + if err = action.Handle(ctx, target); err != nil { + target.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformFailureReason, err.Error()) + if err := r.Client.Status().Patch(ctx, target, ctrl.MergeFrom(&instance)); err != nil { + return reconcile.Result{}, err + } + r.Recorder.Event(&instance, corev1.EventTypeWarning, "Failed", fmt.Sprintf("Failed to update SonataFlowClusterPlaform: %s", err)) + return reconcile.Result{}, err + } + + if target != nil { + target.Status.ObservedGeneration = instance.Generation + + if err := r.Client.Status().Patch(ctx, target, ctrl.MergeFrom(&instance)); err != nil { + r.Recorder.Event(&instance, corev1.EventTypeNormal, "Status Updated", fmt.Sprintf("Updated cluster platform condition %s", instance.Status.GetTopLevelCondition())) + return reconcile.Result{}, err + } + } + + // handle one action at time so the resource + // is always at its latest state + r.Recorder.Event(&instance, corev1.EventTypeNormal, "Updated", fmt.Sprintf("Updated cluster platform condition to %s", instance.Status.GetTopLevelCondition())) + + if target != nil && target.Status.IsReady() { + return reconcile.Result{}, nil + } + + // Requeue + return reconcile.Result{ + RequeueAfter: 5 * time.Second, + }, nil + } + + return reconcile.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *SonataFlowClusterPlatformReconciler) SetupWithManager(mgr ctrlrun.Manager) error { + return ctrlrun.NewControllerManagedBy(mgr). + For(&operatorapi.SonataFlowClusterPlatform{}). + Watches(&operatorapi.SonataFlowPlatform{}, handler.EnqueueRequestsFromMapFunc(r.mapPlatformToClusterPlatformRequests)). + Watches(&operatorapi.SonataFlowClusterPlatform{}, handler.EnqueueRequestsFromMapFunc(r.mapClusterPlatformToClusterPlatformRequests)). + Complete(r) +} + +// if actively referenced sonataflowplatform object is changed, reconcile the active SonataFlowClusterPlatform. +func (r *SonataFlowClusterPlatformReconciler) mapPlatformToClusterPlatformRequests(ctx context.Context, object client.Object) []reconcile.Request { + platform := object.(*operatorapi.SonataFlowPlatform) + sfcPlatform, err := clusterplatform.GetActiveClusterPlatform(ctx, r.Client) + if err != nil && !errors.IsNotFound(err) { + klog.V(log.E).ErrorS(err, "Failed to get active SonataFlowClusterPlatform") Review Comment: I feels more like information rather than an error. After all it's not mandatory to have an active cluster platform CR, right? ########## controllers/sonataflowclusterplatform_controller.go: ########## @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + clientr "github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/clusterplatform" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrlrun "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// SonataFlowClusterPlatformReconciler reconciles a SonataFlowClusterPlatform object +type SonataFlowClusterPlatformReconciler struct { + // This Client, initialized using mgr.Client() above, is a split Client + // that reads objects from the cache and writes to the API server + ctrl.Client + // Non-caching Client + Reader ctrl.Reader + Scheme *runtime.Scheme + Config *rest.Config + Recorder record.EventRecorder +} + +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/finalizers,verbs=update + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// TODO(user): Modify the Reconcile function to compare the state specified by +// the SonataFlowClusterPlatform object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile +func (r *SonataFlowClusterPlatformReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + + // Fetch the SonataFlowClusterPlatform instance + var instance operatorapi.SonataFlowClusterPlatform + + err := r.Client.Get(ctx, req.NamespacedName, &instance) + if err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + klog.V(log.E).ErrorS(err, "Failed to get SonataFlowClusterPlatform") + return reconcile.Result{}, err + } + + instance.Status.Manager().InitializeConditions() + + cli, _ := clientr.FromCtrlClientSchemeAndConfig(r.Client, r.Scheme, r.Config) + action := clusterplatform.NewInitializeAction() + action.InjectClient(cli) + klog.V(log.I).InfoS("Invoking action", "Name", action.Name()) + + target := instance.DeepCopy() + + if action.CanHandle(ctx, target) { + if err = action.Handle(ctx, target); err != nil { + target.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformFailureReason, err.Error()) + if err := r.Client.Status().Patch(ctx, target, ctrl.MergeFrom(&instance)); err != nil { + return reconcile.Result{}, err + } + r.Recorder.Event(&instance, corev1.EventTypeWarning, "Failed", fmt.Sprintf("Failed to update SonataFlowClusterPlaform: %s", err)) + return reconcile.Result{}, err + } + + if target != nil { + target.Status.ObservedGeneration = instance.Generation + + if err := r.Client.Status().Patch(ctx, target, ctrl.MergeFrom(&instance)); err != nil { + r.Recorder.Event(&instance, corev1.EventTypeNormal, "Status Updated", fmt.Sprintf("Updated cluster platform condition %s", instance.Status.GetTopLevelCondition())) + return reconcile.Result{}, err + } + } + + // handle one action at time so the resource + // is always at its latest state + r.Recorder.Event(&instance, corev1.EventTypeNormal, "Updated", fmt.Sprintf("Updated cluster platform condition to %s", instance.Status.GetTopLevelCondition())) + + if target != nil && target.Status.IsReady() { + return reconcile.Result{}, nil + } + + // Requeue + return reconcile.Result{ + RequeueAfter: 5 * time.Second, + }, nil + } + + return reconcile.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *SonataFlowClusterPlatformReconciler) SetupWithManager(mgr ctrlrun.Manager) error { + return ctrlrun.NewControllerManagedBy(mgr). + For(&operatorapi.SonataFlowClusterPlatform{}). + Watches(&operatorapi.SonataFlowPlatform{}, handler.EnqueueRequestsFromMapFunc(r.mapPlatformToClusterPlatformRequests)). + Watches(&operatorapi.SonataFlowClusterPlatform{}, handler.EnqueueRequestsFromMapFunc(r.mapClusterPlatformToClusterPlatformRequests)). + Complete(r) +} + +// if actively referenced sonataflowplatform object is changed, reconcile the active SonataFlowClusterPlatform. +func (r *SonataFlowClusterPlatformReconciler) mapPlatformToClusterPlatformRequests(ctx context.Context, object client.Object) []reconcile.Request { + platform := object.(*operatorapi.SonataFlowPlatform) + sfcPlatform, err := clusterplatform.GetActiveClusterPlatform(ctx, r.Client) + if err != nil && !errors.IsNotFound(err) { + klog.V(log.E).ErrorS(err, "Failed to get active SonataFlowClusterPlatform") + return nil + } + + if sfcPlatform != nil { + sfpcRefNsName := types.NamespacedName{Namespace: sfcPlatform.Spec.PlatformRef.Namespace, Name: sfcPlatform.Spec.PlatformRef.Name} + if client.ObjectKeyFromObject(platform) == sfpcRefNsName { + return []reconcile.Request{{NamespacedName: client.ObjectKeyFromObject(sfcPlatform)}} + } + } + return nil +} + +// if active sonataflowclusterplatform is changed, reconcile other SonataFlowClusterPlatforms. +func (r *SonataFlowClusterPlatformReconciler) mapClusterPlatformToClusterPlatformRequests(ctx context.Context, object client.Object) []reconcile.Request { + sfcPlatform := object.(*operatorapi.SonataFlowClusterPlatform) + var requests []reconcile.Request + if sfcPlatform != nil && clusterplatform.IsActive(sfcPlatform) { + var scpList operatorapi.SonataFlowClusterPlatformList + if err := r.List(ctx, &scpList); err != nil { + klog.V(log.E).ErrorS(err, "Could not list SonataFlowClusterPlatforms. "+ + "SonataFlowClusterPlatforms affected by changes to the active SonataFlowClusterPlatform %s will not be reconciled.", + sfcPlatform.Name) + return nil + } + + for _, cPlatform := range scpList.Items { + namespacedName := client.ObjectKeyFromObject(&cPlatform) + // this check is required so that the active clusterplatform object doesn't reconcile + if client.ObjectKeyFromObject(sfcPlatform) != namespacedName { Review Comment: nit: create a variable outside the loop to contain the value from `client.ObjectKeyFromObject(sfcPlatform)` and reference it inside the loop. ########## controllers/sonataflowclusterplatform_controller.go: ########## @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/apache/incubator-kie-kogito-serverless-operator/api" + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + clientr "github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/clusterplatform" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrlrun "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// SonataFlowClusterPlatformReconciler reconciles a SonataFlowClusterPlatform object +type SonataFlowClusterPlatformReconciler struct { + // This Client, initialized using mgr.Client() above, is a split Client + // that reads objects from the cache and writes to the API server + ctrl.Client + // Non-caching Client + Reader ctrl.Reader + Scheme *runtime.Scheme + Config *rest.Config + Recorder record.EventRecorder +} + +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowclusterplatforms/finalizers,verbs=update + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// TODO(user): Modify the Reconcile function to compare the state specified by +// the SonataFlowClusterPlatform object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile +func (r *SonataFlowClusterPlatformReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + + // Fetch the SonataFlowClusterPlatform instance + var instance operatorapi.SonataFlowClusterPlatform + + err := r.Client.Get(ctx, req.NamespacedName, &instance) + if err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + klog.V(log.E).ErrorS(err, "Failed to get SonataFlowClusterPlatform") + return reconcile.Result{}, err + } + + instance.Status.Manager().InitializeConditions() + + cli, _ := clientr.FromCtrlClientSchemeAndConfig(r.Client, r.Scheme, r.Config) + action := clusterplatform.NewInitializeAction() + action.InjectClient(cli) + klog.V(log.I).InfoS("Invoking action", "Name", action.Name()) + + target := instance.DeepCopy() + + if action.CanHandle(ctx, target) { + if err = action.Handle(ctx, target); err != nil { + target.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformFailureReason, err.Error()) + if err := r.Client.Status().Patch(ctx, target, ctrl.MergeFrom(&instance)); err != nil { + return reconcile.Result{}, err + } + r.Recorder.Event(&instance, corev1.EventTypeWarning, "Failed", fmt.Sprintf("Failed to update SonataFlowClusterPlaform: %s", err)) + return reconcile.Result{}, err + } + + if target != nil { + target.Status.ObservedGeneration = instance.Generation + + if err := r.Client.Status().Patch(ctx, target, ctrl.MergeFrom(&instance)); err != nil { + r.Recorder.Event(&instance, corev1.EventTypeNormal, "Status Updated", fmt.Sprintf("Updated cluster platform condition %s", instance.Status.GetTopLevelCondition())) + return reconcile.Result{}, err + } + } + + // handle one action at time so the resource + // is always at its latest state + r.Recorder.Event(&instance, corev1.EventTypeNormal, "Updated", fmt.Sprintf("Updated cluster platform condition to %s", instance.Status.GetTopLevelCondition())) + + if target != nil && target.Status.IsReady() { + return reconcile.Result{}, nil + } + + // Requeue + return reconcile.Result{ + RequeueAfter: 5 * time.Second, + }, nil + } + + return reconcile.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *SonataFlowClusterPlatformReconciler) SetupWithManager(mgr ctrlrun.Manager) error { + return ctrlrun.NewControllerManagedBy(mgr). + For(&operatorapi.SonataFlowClusterPlatform{}). + Watches(&operatorapi.SonataFlowPlatform{}, handler.EnqueueRequestsFromMapFunc(r.mapPlatformToClusterPlatformRequests)). + Watches(&operatorapi.SonataFlowClusterPlatform{}, handler.EnqueueRequestsFromMapFunc(r.mapClusterPlatformToClusterPlatformRequests)). + Complete(r) +} + +// if actively referenced sonataflowplatform object is changed, reconcile the active SonataFlowClusterPlatform. +func (r *SonataFlowClusterPlatformReconciler) mapPlatformToClusterPlatformRequests(ctx context.Context, object client.Object) []reconcile.Request { + platform := object.(*operatorapi.SonataFlowPlatform) + sfcPlatform, err := clusterplatform.GetActiveClusterPlatform(ctx, r.Client) + if err != nil && !errors.IsNotFound(err) { + klog.V(log.E).ErrorS(err, "Failed to get active SonataFlowClusterPlatform") + return nil + } + + if sfcPlatform != nil { + sfpcRefNsName := types.NamespacedName{Namespace: sfcPlatform.Spec.PlatformRef.Namespace, Name: sfcPlatform.Spec.PlatformRef.Name} + if client.ObjectKeyFromObject(platform) == sfpcRefNsName { + return []reconcile.Request{{NamespacedName: client.ObjectKeyFromObject(sfcPlatform)}} + } + } + return nil +} + +// if active sonataflowclusterplatform is changed, reconcile other SonataFlowClusterPlatforms. +func (r *SonataFlowClusterPlatformReconciler) mapClusterPlatformToClusterPlatformRequests(ctx context.Context, object client.Object) []reconcile.Request { + sfcPlatform := object.(*operatorapi.SonataFlowClusterPlatform) + var requests []reconcile.Request + if sfcPlatform != nil && clusterplatform.IsActive(sfcPlatform) { + var scpList operatorapi.SonataFlowClusterPlatformList + if err := r.List(ctx, &scpList); err != nil { Review Comment: nit: you could use a field selector to filter out the result list and remove the need for the check. Also, you wouldn't need to check for the namespace where the object resides since it's a cluster scoped object. Just the name would suffice. ``` fs, err := fields.ParseSelector(fmt.Sprintf("metadata.name!=%s", sfcPlatform.Name)) if err != nil { klog.V(log.E).ErrorS(err, "Failed to parse field selector for cluster platform CR %s", sfcPlatform.Name) return nil } if err := r.List(ctx, &scpList, &ctrl.ListOptions{FieldSelector: fs}); err != nil { ... } for _, cPlatform := range scpList.Items { requests = append(requests, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(&cPlatform)}) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
