ricardozanini commented on code in PR #467:
URL:
https://github.com/apache/incubator-kie-kogito-serverless-operator/pull/467#discussion_r1606895565
##########
api/v1alpha08/sonataflowplatform_types.go:
##########
@@ -61,6 +65,15 @@ type SonataFlowPlatformSpec struct {
Properties *PropertyPlatformSpec `json:"properties,omitempty"`
}
+// PlatformEventingSpec specifies the broker to be used as the default sink or
source
Review Comment:
```suggestion
// PlatformEventingSpec specifies the Knative Eventing integration details
in the platform.
```
##########
api/v1alpha08/sonataflowplatform_types.go:
##########
@@ -47,6 +48,9 @@ type SonataFlowPlatformSpec struct {
// +optional
//
+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Services"
Services *ServicesPlatformSpec `json:"services,omitempty"`
+ // Eventing defines a broker as the default broker when the workflow,
Dataindex, or Jobservice does not have a sink or source specified.
Review Comment:
```suggestion
// Eventing describes the information required for Knative Eventing
integration in the platform.
```
##########
controllers/knative/knative.go:
##########
@@ -84,3 +102,100 @@ func GetKnativeAvailability(cfg *rest.Config)
(*Availability, error) {
return result, nil
}
}
+
+func GetWorkflowSink(ctx context.Context, c client.Client, workflow
*operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform)
(*duckv1.Destination, error) {
+ if workflow == nil {
+ return nil, nil
+ }
+ if workflow.Spec.Sink != nil {
+ return workflow.Spec.Sink, nil
+ }
+ if pl != nil {
+ if pl.Spec.Eventing != nil {
+ // no sink defined in the workflow, use the platform
broker
+ return pl.Spec.Eventing.Broker, nil
+ } else if pl.Status.ClusterPlatformRef != nil {
+ // Find the platform referred by the cluster platform
+ platform := &operatorapi.SonataFlowPlatform{}
+ if err := c.Get(ctx, types.NamespacedName{Namespace:
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name:
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+ return nil, fmt.Errorf("error reading the
platform referred by the cluster platform")
+ }
+ if platform.Spec.Eventing != nil {
+ return platform.Spec.Eventing.Broker, nil
+ }
+ }
+ }
+ return nil, nil
+}
+
+const knativeBrokerAnnotation = "eventing.knative.dev/broker.class"
Review Comment:
Can you keep the `const` on the top of the file? It's easier to maintain
later.
##########
controllers/knative/knative.go:
##########
@@ -84,3 +102,100 @@ func GetKnativeAvailability(cfg *rest.Config)
(*Availability, error) {
return result, nil
}
}
+
+func GetWorkflowSink(ctx context.Context, c client.Client, workflow
*operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform)
(*duckv1.Destination, error) {
+ if workflow == nil {
+ return nil, nil
+ }
+ if workflow.Spec.Sink != nil {
+ return workflow.Spec.Sink, nil
+ }
+ if pl != nil {
+ if pl.Spec.Eventing != nil {
+ // no sink defined in the workflow, use the platform
broker
+ return pl.Spec.Eventing.Broker, nil
+ } else if pl.Status.ClusterPlatformRef != nil {
+ // Find the platform referred by the cluster platform
+ platform := &operatorapi.SonataFlowPlatform{}
+ if err := c.Get(ctx, types.NamespacedName{Namespace:
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name:
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+ return nil, fmt.Errorf("error reading the
platform referred by the cluster platform")
+ }
+ if platform.Spec.Eventing != nil {
+ return platform.Spec.Eventing.Broker, nil
+ }
+ }
+ }
+ return nil, nil
+}
+
+const knativeBrokerAnnotation = "eventing.knative.dev/broker.class"
+
+func GetKnativeResource(ctx context.Context, cfg *rest.Config, kRef
*duckv1.KReference) (*unstructured.Unstructured, error) {
+ dynamicClient, err := dynamic.NewForConfig(cfg)
+ if err != nil {
+ return nil, err
+ }
+ gv, err := schema.ParseGroupVersion(kRef.APIVersion)
+ if err != nil {
+ return nil, err
+ }
+ resourceId := schema.GroupVersionResource{
+ Group: gv.Group,
+ Version: gv.Version,
+ Resource: kRef.Kind,
+ }
+ if len(kRef.Namespace) == 0 {
+ return nil, fmt.Errorf("namespace for knative resource %s is
missing", kRef.Name)
+ }
+ list, err :=
dynamicClient.Resource(resourceId).Namespace(kRef.Namespace).List(ctx,
metav1.ListOptions{})
+ fmt.Printf("list:%v", list)
Review Comment:
Debug leftovers, right? :D
##########
controllers/platform/services/knative.go:
##########
@@ -0,0 +1,81 @@
+// Copyright 2024 Apache Software Foundation (ASF)
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package services
+
+import (
+ "context"
+ "fmt"
+
+
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative"
+
+
"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/client-go/dynamic"
+ duckv1 "knative.dev/pkg/apis/duck/v1"
+)
+
+const knativeBrokerAnnotation = "eventing.knative.dev/broker.class"
+
+func KnativeResourceExists(ctx context.Context, c client.Client, dest
*duckv1.Destination, nsDefault string) (*unstructured.Unstructured, error) {
+ dynamicClient, err := dynamic.NewForConfig(c.GetConfig())
+ if err != nil {
+ return nil, err
+ }
+ resourceId := schema.GroupVersionResource{
+ Group: dest.Ref.Group,
+ Version: dest.Ref.APIVersion,
+ Resource: dest.Ref.Kind,
+ }
+ namespace := dest.Ref.Namespace
+ if len(namespace) == 0 {
+ namespace = nsDefault
+ }
+ return dynamicClient.Resource(resourceId).Namespace(namespace).Get(ctx,
dest.Ref.Name, metav1.GetOptions{})
+}
+
+func IsKnativeBroker(ctx context.Context, c client.Client, dest
*duckv1.Destination, namespace string) (bool, error) {
+ if dest == nil {
+ return false, fmt.Errorf("broker destinition is empty")
+ }
+ knativeAvail := GetKnativeAvailability(c)
+ if !knativeAvail.Eventing {
+ return false, fmt.Errorf("knative eventing is not deployed in
the cluster")
+ }
+ obj, err := KnativeResourceExists(ctx, c, dest, namespace)
+ if err != nil {
+ return false, fmt.Errorf("failed to find knative resource %v",
dest)
+ }
+ annotations := obj.GetAnnotations()
+ if len(annotations) > 0 {
+ if _, ok := annotations[knativeBrokerAnnotation]; ok {
+ return true, nil
+ }
+ }
+ return false, nil
+}
+
+func GetKnativeAvailability(c client.Client) *knative.Availability {
Review Comment:
I think we already have something in place:
https://github.com/apache/incubator-kie-kogito-serverless-operator/blob/main/controllers/knative/knative.go#L67
##########
controllers/knative/knative.go:
##########
@@ -84,3 +102,100 @@ func GetKnativeAvailability(cfg *rest.Config)
(*Availability, error) {
return result, nil
}
}
+
+func GetWorkflowSink(ctx context.Context, c client.Client, workflow
*operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform)
(*duckv1.Destination, error) {
+ if workflow == nil {
+ return nil, nil
+ }
+ if workflow.Spec.Sink != nil {
+ return workflow.Spec.Sink, nil
+ }
+ if pl != nil {
+ if pl.Spec.Eventing != nil {
+ // no sink defined in the workflow, use the platform
broker
+ return pl.Spec.Eventing.Broker, nil
+ } else if pl.Status.ClusterPlatformRef != nil {
+ // Find the platform referred by the cluster platform
+ platform := &operatorapi.SonataFlowPlatform{}
+ if err := c.Get(ctx, types.NamespacedName{Namespace:
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name:
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
Review Comment:
Don't we already have a function to read the platform doing a similar logic?
We pass the `SonataFlowPlatform` and get back either itself or the cluster ref?
##########
controllers/profiles/common/object_creators.go:
##########
@@ -51,14 +54,17 @@ import (
// ObjectCreator is the func that creates the initial reference object, if the
object doesn't exist in the cluster, this one is created.
// Can be used as a reference to keep the object immutable
-type ObjectCreator func(workflow *operatorapi.SonataFlow) (client.Object,
error)
+type ObjectCreator func(workflow *operatorapi.SonataFlow, support
*StateSupport) (client.Object, error)
Review Comment:
Please review this API change, it's unnecessary if all you need is the
`client`. That brings a lot of change to the code base. Same for the `Ensurer`.
##########
controllers/profiles/common/properties/managed.go:
##########
@@ -183,10 +185,10 @@ func NewManagedPropertyHandler(workflow
*operatorapi.SonataFlow, platform *opera
return handler.withKogitoServiceUrl(), nil
}
-// ApplicationManagedProperties immutable default application properties that
can be used with any workflow based on Quarkus.
-// Alias for NewManagedPropertyHandler(workflow).Build()
-func ApplicationManagedProperties(workflow *operatorapi.SonataFlow, platform
*operatorapi.SonataFlowPlatform) (string, error) {
- p, err := NewManagedPropertyHandler(workflow, platform)
+// ImmutableApplicationProperties immutable default application properties
that can be used with any workflow based on Quarkus.
+// Alias for NewAppPropertyHandler(workflow).Build()
+func ImmutableApplicationProperties(ctx context.Context, c client.Client,
workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform)
(string, error) {
Review Comment:
Why have you changed this function name?
##########
controllers/profiles/common/mutate_visitors.go:
##########
@@ -190,8 +216,34 @@ func RolloutDeploymentIfCMChangedMutateVisitor(workflow
*operatorapi.SonataFlow,
return func(object client.Object) controllerutil.MutateFn {
return func() error {
deployment := object.(*appsv1.Deployment)
+ restoreKnativeVolumeAndVolumeMount(deployment)
err :=
kubeutil.AnnotateDeploymentConfigChecksum(workflow, deployment, userPropsCM,
managedPropsCM)
return err
}
}
}
+
+func moveKnativeVolumeToEnd(vols []corev1.Volume) {
+ for i := 0; i < len(vols)-1; i++ {
+ if vols[i].Name == KnativeBundleVolume {
+ vols[i], vols[i+1] = vols[i+1], vols[i]
+ }
+ }
+}
+
+func moveKnativeVolumeMountToEnd(mounts []corev1.VolumeMount) {
+ for i := 0; i < len(mounts)-1; i++ {
+ if mounts[i].Name == KnativeBundleVolume {
+ mounts[i], mounts[i+1] = mounts[i+1], mounts[i]
+ }
+ }
+}
+
+// Knative Sinkbinding injects K_SINK env, a volume and volumn mount. The
volume and volume mount
+// must be in the end of the array to avoid repeadly restarting of the
workflow pod
+func restoreKnativeVolumeAndVolumeMount(deployment *appsv1.Deployment) {
+ moveKnativeVolumeToEnd(deployment.Spec.Template.Spec.Volumes)
+ for i := 0; i < len(deployment.Spec.Template.Spec.Containers); i++ {
+
moveKnativeVolumeMountToEnd(deployment.Spec.Template.Spec.Containers[i].VolumeMounts)
+ }
+}
Review Comment:
Please move these knative specifics to a `knative.go` file in this module.
##########
controllers/knative/knative.go:
##########
@@ -84,3 +102,100 @@ func GetKnativeAvailability(cfg *rest.Config)
(*Availability, error) {
return result, nil
}
}
+
+func GetWorkflowSink(ctx context.Context, c client.Client, workflow
*operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform)
(*duckv1.Destination, error) {
+ if workflow == nil {
+ return nil, nil
+ }
+ if workflow.Spec.Sink != nil {
+ return workflow.Spec.Sink, nil
+ }
+ if pl != nil {
+ if pl.Spec.Eventing != nil {
Review Comment:
I think you can simplify this conditional with `pl != nil &&
pl.Spec.Eventing != nil`.
##########
api/v1alpha08/sonataflowplatform_types.go:
##########
@@ -61,6 +65,15 @@ type SonataFlowPlatformSpec struct {
Properties *PropertyPlatformSpec `json:"properties,omitempty"`
}
+// PlatformEventingSpec specifies the broker to be used as the default sink or
source
+// +k8s:openapi-gen=true
+type PlatformEventingSpec struct {
+ // Broker to commute with workflow deployment
Review Comment:
```suggestion
// Broker to commute with workflow deployment. It can be the default
broker when the workflow, Dataindex, or Jobservice does not have a sink or
source specified.
```
##########
controllers/platform/services/properties.go:
##########
@@ -157,11 +160,15 @@ func generateReactiveURL(postgresSpec
*operatorapi.PersistencePostgreSQL, schema
// with the Data Index. For the calculation this function considers if the
Data Index is present in the
// SonataFlowPlatform, if not present, no properties.
// Never nil.
-func GenerateDataIndexWorkflowProperties(workflow *operatorapi.SonataFlow,
platform *operatorapi.SonataFlowPlatform) (*properties.Properties, error) {
+func GenerateDataIndexWorkflowProperties(ctx context.Context, client
client.Client, workflow *operatorapi.SonataFlow, platform
*operatorapi.SonataFlowPlatform) (*properties.Properties, error) {
Review Comment:
We can avoid passing `client` here.
##########
controllers/platform/k8s.go:
##########
@@ -61,14 +62,14 @@ func (action *serviceAction) Handle(ctx context.Context,
platform *operatorapi.S
return nil, err
}
- psDI := services.NewDataIndexHandler(platform)
+ psDI := services.NewDataIndexHandler(ctx, action.client, platform)
if psDI.IsServiceSetInSpec() {
if err := createOrUpdateServiceComponents(ctx, action.client,
platform, psDI); err != nil {
return nil, err
}
}
- psJS := services.NewJobServiceHandler(platform)
+ psJS := services.NewJobServiceHandler(ctx, action.client, platform)
Review Comment:
There's no need to pass the `client` reference anymore. We can access it
from the `utils` package.
##########
controllers/knative/knative.go:
##########
@@ -84,3 +102,100 @@ func GetKnativeAvailability(cfg *rest.Config)
(*Availability, error) {
return result, nil
}
}
+
+func GetWorkflowSink(ctx context.Context, c client.Client, workflow
*operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform)
(*duckv1.Destination, error) {
+ if workflow == nil {
+ return nil, nil
+ }
+ if workflow.Spec.Sink != nil {
+ return workflow.Spec.Sink, nil
+ }
+ if pl != nil {
+ if pl.Spec.Eventing != nil {
+ // no sink defined in the workflow, use the platform
broker
+ return pl.Spec.Eventing.Broker, nil
+ } else if pl.Status.ClusterPlatformRef != nil {
+ // Find the platform referred by the cluster platform
+ platform := &operatorapi.SonataFlowPlatform{}
+ if err := c.Get(ctx, types.NamespacedName{Namespace:
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name:
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+ return nil, fmt.Errorf("error reading the
platform referred by the cluster platform")
+ }
+ if platform.Spec.Eventing != nil {
+ return platform.Spec.Eventing.Broker, nil
+ }
+ }
+ }
+ return nil, nil
+}
+
+const knativeBrokerAnnotation = "eventing.knative.dev/broker.class"
+
+func GetKnativeResource(ctx context.Context, cfg *rest.Config, kRef
*duckv1.KReference) (*unstructured.Unstructured, error) {
+ dynamicClient, err := dynamic.NewForConfig(cfg)
+ if err != nil {
+ return nil, err
+ }
+ gv, err := schema.ParseGroupVersion(kRef.APIVersion)
+ if err != nil {
+ return nil, err
+ }
+ resourceId := schema.GroupVersionResource{
+ Group: gv.Group,
+ Version: gv.Version,
+ Resource: kRef.Kind,
+ }
+ if len(kRef.Namespace) == 0 {
+ return nil, fmt.Errorf("namespace for knative resource %s is
missing", kRef.Name)
+ }
+ list, err :=
dynamicClient.Resource(resourceId).Namespace(kRef.Namespace).List(ctx,
metav1.ListOptions{})
+ fmt.Printf("list:%v", list)
+ return
dynamicClient.Resource(resourceId).Namespace(kRef.Namespace).Get(ctx,
kRef.Name, metav1.GetOptions{})
+}
+
+func IsKnativeBroker(kRef *duckv1.KReference) bool {
+ return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind ==
"Broker"
+}
+
+func IsKnativeEnvInjected(ctx context.Context, c client.Client,
deploymentName, namespace string) (bool, error) {
Review Comment:
If this is a public function and the `Deployment` is ours, can you change
the input param to receive a `SonataFlow` reference instead?
##########
controllers/knative/knative.go:
##########
@@ -84,3 +102,100 @@ func GetKnativeAvailability(cfg *rest.Config)
(*Availability, error) {
return result, nil
}
}
+
+func GetWorkflowSink(ctx context.Context, c client.Client, workflow
*operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform)
(*duckv1.Destination, error) {
+ if workflow == nil {
+ return nil, nil
+ }
+ if workflow.Spec.Sink != nil {
+ return workflow.Spec.Sink, nil
+ }
+ if pl != nil {
+ if pl.Spec.Eventing != nil {
+ // no sink defined in the workflow, use the platform
broker
+ return pl.Spec.Eventing.Broker, nil
+ } else if pl.Status.ClusterPlatformRef != nil {
+ // Find the platform referred by the cluster platform
+ platform := &operatorapi.SonataFlowPlatform{}
+ if err := c.Get(ctx, types.NamespacedName{Namespace:
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name:
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+ return nil, fmt.Errorf("error reading the
platform referred by the cluster platform")
+ }
+ if platform.Spec.Eventing != nil {
+ return platform.Spec.Eventing.Broker, nil
+ }
+ }
+ }
+ return nil, nil
+}
+
+const knativeBrokerAnnotation = "eventing.knative.dev/broker.class"
+
+func GetKnativeResource(ctx context.Context, cfg *rest.Config, kRef
*duckv1.KReference) (*unstructured.Unstructured, error) {
Review Comment:
Knative doesn't have a similar function that does this already?
##########
controllers/platform/services/properties.go:
##########
@@ -183,19 +199,29 @@ func GenerateDataIndexWorkflowProperties(workflow
*operatorapi.SonataFlow, platf
// with the Job Service. For the calculation this function considers if the
Job Service is present in the
// SonataFlowPlatform, if not present, no properties.
// Never nil.
-func GenerateJobServiceWorkflowProperties(workflow *operatorapi.SonataFlow,
platform *operatorapi.SonataFlowPlatform) (*properties.Properties, error) {
+func GenerateJobServiceWorkflowProperties(ctx context.Context, client
client.Client, workflow *operatorapi.SonataFlow, platform
*operatorapi.SonataFlowPlatform) (*properties.Properties, error) {
Review Comment:
Same about `client`.
##########
controllers/platform/services/services.go:
##########
@@ -505,3 +576,167 @@ func mergeContainerPreservingEnvVars(dest
*corev1.Container, source *corev1.Cont
}
return nil
}
+
+// GetPlatformBroker gets the default broker for the platform.
+func GetPlatformBroker(platform *operatorapi.SonataFlowPlatform)
*duckv1.Destination {
+ if platform != nil && platform.Spec.Eventing != nil &&
platform.Spec.Eventing.Broker != nil {
+ return platform.Spec.Eventing.Broker
+ }
+ return nil
+}
+
+func (d *DataIndexHandler) GetSourceBroker() *duckv1.Destination {
+ if d.platform != nil && d.platform.Spec.Services.DataIndex.Source !=
nil && d.platform.Spec.Services.DataIndex.Source.Ref != nil {
+ return d.platform.Spec.Services.DataIndex.Source
+ }
+ return GetPlatformBroker(d.platform)
+}
+
+func (d *DataIndexHandler) newTrigger(labels map[string]string, brokerName,
namespace, platformName, serviceName, tag, eventType, path string)
*eventingv1.Trigger {
+ return &eventingv1.Trigger{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: fmt.Sprintf("%s-data-index-%s-trigger",
platformName, tag),
+ Namespace: namespace,
+ Labels: labels,
+ },
+ Spec: eventingv1.TriggerSpec{
+ Broker: brokerName,
+ Filter: &eventingv1.TriggerFilter{
+ Attributes: eventingv1.TriggerFilterAttributes{
+ "type": eventType,
+ },
+ },
+ Subscriber: duckv1.Destination{
+ Ref: &duckv1.KReference{
+ Name: serviceName,
+ Namespace: namespace,
+ APIVersion: "v1",
+ Kind: "Service",
+ },
+ URI: &apis.URL{
+ Path: path,
+ },
+ },
+ },
+ }
+}
+func (d *DataIndexHandler) GenerateKnativeResources(platform
*operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object,
error) {
+ broker := d.GetSourceBroker()
+ if broker == nil {
+ return nil, nil // Nothing to do
+ }
+ brokerName := broker.Ref.Name
+ namespace := platform.Namespace
+ serviceName := d.GetServiceName()
+ return []client.Object{
+ d.newTrigger(lbl, brokerName, namespace, platform.Name,
serviceName, "process-error", "ProcessInstanceErrorDataEvent", pathProcesses),
+ d.newTrigger(lbl, brokerName, namespace, platform.Name,
serviceName, "process-node", "ProcessInstanceNodeDataEvent", pathProcesses),
+ d.newTrigger(lbl, brokerName, namespace, platform.Name,
serviceName, "process-sla", "ProcessInstanceSLADataEvent", pathProcesses),
+ d.newTrigger(lbl, brokerName, namespace, platform.Name,
serviceName, "process-state", "ProcessInstanceStateDataEvent", pathProcesses),
+ d.newTrigger(lbl, brokerName, namespace, platform.Name,
serviceName, "process-variable", "ProcessInstanceVariableDataEvent",
pathProcesses),
+ d.newTrigger(lbl, brokerName, namespace, platform.Name,
serviceName, "process-definition", "ProcessDefinitionEvent", pathDefinitions),
+ d.newTrigger(lbl, brokerName, namespace, platform.Name,
serviceName, "jobs", "JobEvent", pathJobs)}, nil
+}
+
+func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
+ if d.platform.Spec.Services.JobService.Source != nil &&
d.platform.Spec.Services.JobService.Source.Ref != nil {
+ return d.platform.Spec.Services.JobService.Source
+ }
+ return GetPlatformBroker(d.platform)
+}
+
+func (d JobServiceHandler) GetSink() *duckv1.Destination {
+ if d.platform.Spec.Services.JobService.Sink != nil {
+ return d.platform.Spec.Services.JobService.Source
+ }
+ return GetPlatformBroker(d.platform)
+}
+
+func (j *JobServiceHandler) GenerateKnativeResources(platform
*operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object,
error) {
+ broker := j.GetSourceBroker()
+ sink := j.GetSink()
+ namespace := platform.Namespace
+ resultObjs := []client.Object{}
+
+ if broker != nil {
+ brokerName := broker.Ref.Name
+ jobCreateTrigger := &eventingv1.Trigger{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "jobs-service-create-job-trigger",
Review Comment:
What is the naming strategy for the Jobs Service instance? Maybe we should
append that name here.
##########
workflowproj/operator.go:
##########
@@ -41,6 +41,8 @@ const (
LabelService = metadata.Domain + "/service"
// LabelWorkflow specialized label managed by the controller
LabelWorkflow = metadata.Domain + "/workflow-app"
+ // LabelWorkflowNamespace specialized label managed by the controller
indicating the namespace of the workflow
+ LabelWorkflowNamespace = metadata.Domain + "/workflow-namespace"
Review Comment:
Can you refresh my memory here. Why do we need this? The
`metadata.namespace` already cares for this info.
##########
controllers/profiles/common/mutate_visitors.go:
##########
@@ -96,25 +100,47 @@ func EnsureDeployment(original *appsv1.Deployment, object
*appsv1.Deployment) er
object.Spec.Replicas = original.Spec.Replicas
object.Spec.Selector = original.Spec.Selector
object.Labels = original.GetLabels()
+ object.Finalizers = original.Finalizers
// Clean up the volumes, they are inherited from original, additional
are added by other visitors
- object.Spec.Template.Spec.Volumes = nil
+ // However, the knative mount path must be preserved
+ var kneVol *corev1.Volume = nil
Review Comment:
Please keep this in a different mutator.
##########
controllers/knative/knative.go:
##########
@@ -84,3 +102,100 @@ func GetKnativeAvailability(cfg *rest.Config)
(*Availability, error) {
return result, nil
}
}
+
+func GetWorkflowSink(ctx context.Context, c client.Client, workflow
*operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform)
(*duckv1.Destination, error) {
+ if workflow == nil {
+ return nil, nil
+ }
+ if workflow.Spec.Sink != nil {
+ return workflow.Spec.Sink, nil
+ }
+ if pl != nil {
+ if pl.Spec.Eventing != nil {
+ // no sink defined in the workflow, use the platform
broker
+ return pl.Spec.Eventing.Broker, nil
+ } else if pl.Status.ClusterPlatformRef != nil {
+ // Find the platform referred by the cluster platform
+ platform := &operatorapi.SonataFlowPlatform{}
+ if err := c.Get(ctx, types.NamespacedName{Namespace:
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name:
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+ return nil, fmt.Errorf("error reading the
platform referred by the cluster platform")
+ }
+ if platform.Spec.Eventing != nil {
+ return platform.Spec.Eventing.Broker, nil
+ }
+ }
+ }
+ return nil, nil
+}
+
+const knativeBrokerAnnotation = "eventing.knative.dev/broker.class"
+
+func GetKnativeResource(ctx context.Context, cfg *rest.Config, kRef
*duckv1.KReference) (*unstructured.Unstructured, error) {
+ dynamicClient, err := dynamic.NewForConfig(cfg)
+ if err != nil {
+ return nil, err
+ }
+ gv, err := schema.ParseGroupVersion(kRef.APIVersion)
+ if err != nil {
+ return nil, err
+ }
+ resourceId := schema.GroupVersionResource{
+ Group: gv.Group,
+ Version: gv.Version,
+ Resource: kRef.Kind,
+ }
+ if len(kRef.Namespace) == 0 {
+ return nil, fmt.Errorf("namespace for knative resource %s is
missing", kRef.Name)
+ }
+ list, err :=
dynamicClient.Resource(resourceId).Namespace(kRef.Namespace).List(ctx,
metav1.ListOptions{})
+ fmt.Printf("list:%v", list)
+ return
dynamicClient.Resource(resourceId).Namespace(kRef.Namespace).Get(ctx,
kRef.Name, metav1.GetOptions{})
+}
+
+func IsKnativeBroker(kRef *duckv1.KReference) bool {
+ return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind ==
"Broker"
+}
+
+func IsKnativeEnvInjected(ctx context.Context, c client.Client,
deploymentName, namespace string) (bool, error) {
+ deployment := &appsv1.Deployment{}
+ if err := c.Get(ctx, types.NamespacedName{Name: deploymentName,
Namespace: namespace}, deployment); err != nil {
+ if errors.IsNotFound(err) {
+ return false, nil //deployment not found
+ }
+ return false, err
+ }
+ for _, env := range deployment.Spec.Template.Spec.Containers[0].Env {
+ if env.Name == KSink {
+ return true, nil
+ }
+ }
+ return false, nil
+}
+
+func IsDataIndexEnabled(plf *operatorapi.SonataFlowPlatform) bool {
+ if plf.Spec.Services != nil {
+ if plf.Spec.Services.DataIndex != nil {
+ return
pointer.BoolDeref(plf.Spec.Services.DataIndex.Enabled, false)
+ }
+ return false
+ }
+ // Check if DataIndex is enabled in the platform status
+ if plf.Status.ClusterPlatformRef != nil &&
plf.Status.ClusterPlatformRef.Services != nil &&
plf.Status.ClusterPlatformRef.Services.DataIndexRef != nil &&
len(plf.Status.ClusterPlatformRef.Services.DataIndexRef.Url) > 0 {
+ return true
+ }
+ return false
+}
+
+func IsJobServiceEnabled(plf *operatorapi.SonataFlowPlatform) bool {
+ if plf.Spec.Services != nil {
+ if plf.Spec.Services.JobService != nil {
+ return
pointer.BoolDeref(plf.Spec.Services.JobService.Enabled, false)
+ }
+ return false
+ }
+ // Check if JobService is enabled in the platform status
+ if plf.Status.ClusterPlatformRef != nil &&
plf.Status.ClusterPlatformRef.Services != nil &&
plf.Status.ClusterPlatformRef.Services.JobServiceRef != nil &&
len(plf.Status.ClusterPlatformRef.Services.JobServiceRef.Url) > 0 {
+ return true
+ }
+ return false
Review Comment:
Can you please move the functions `IsDataIndexEnabled` and
`IsJobServiceEnabled` to the `controllers/platform` module?
##########
controllers/profiles/common/mutate_visitors.go:
##########
@@ -154,7 +180,7 @@ func ServiceMutateVisitor(workflow *operatorapi.SonataFlow)
MutateVisitor {
}
}
-func ManagedPropertiesMutateVisitor(ctx context.Context, catalog
discovery.ServiceCatalog,
+func ManagedPropertiesMutateVisitor(ctx context.Context, c client.Client,
catalog discovery.ServiceCatalog,
Review Comment:
Don't need to pass the `client` around.
##########
controllers/knative/knative.go:
##########
@@ -84,3 +102,100 @@ func GetKnativeAvailability(cfg *rest.Config)
(*Availability, error) {
return result, nil
}
}
+
+func GetWorkflowSink(ctx context.Context, c client.Client, workflow
*operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform)
(*duckv1.Destination, error) {
+ if workflow == nil {
+ return nil, nil
+ }
+ if workflow.Spec.Sink != nil {
+ return workflow.Spec.Sink, nil
+ }
+ if pl != nil {
+ if pl.Spec.Eventing != nil {
+ // no sink defined in the workflow, use the platform
broker
+ return pl.Spec.Eventing.Broker, nil
+ } else if pl.Status.ClusterPlatformRef != nil {
+ // Find the platform referred by the cluster platform
+ platform := &operatorapi.SonataFlowPlatform{}
+ if err := c.Get(ctx, types.NamespacedName{Namespace:
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name:
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+ return nil, fmt.Errorf("error reading the
platform referred by the cluster platform")
+ }
+ if platform.Spec.Eventing != nil {
+ return platform.Spec.Eventing.Broker, nil
+ }
+ }
+ }
+ return nil, nil
+}
+
+const knativeBrokerAnnotation = "eventing.knative.dev/broker.class"
+
+func GetKnativeResource(ctx context.Context, cfg *rest.Config, kRef
*duckv1.KReference) (*unstructured.Unstructured, error) {
+ dynamicClient, err := dynamic.NewForConfig(cfg)
+ if err != nil {
+ return nil, err
+ }
+ gv, err := schema.ParseGroupVersion(kRef.APIVersion)
+ if err != nil {
+ return nil, err
+ }
+ resourceId := schema.GroupVersionResource{
+ Group: gv.Group,
+ Version: gv.Version,
+ Resource: kRef.Kind,
+ }
+ if len(kRef.Namespace) == 0 {
+ return nil, fmt.Errorf("namespace for knative resource %s is
missing", kRef.Name)
+ }
+ list, err :=
dynamicClient.Resource(resourceId).Namespace(kRef.Namespace).List(ctx,
metav1.ListOptions{})
+ fmt.Printf("list:%v", list)
+ return
dynamicClient.Resource(resourceId).Namespace(kRef.Namespace).Get(ctx,
kRef.Name, metav1.GetOptions{})
+}
+
+func IsKnativeBroker(kRef *duckv1.KReference) bool {
+ return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind ==
"Broker"
+}
+
+func IsKnativeEnvInjected(ctx context.Context, c client.Client,
deploymentName, namespace string) (bool, error) {
Review Comment:
You don't need to pass `Client` over, you can use this:
https://github.com/apache/incubator-kie-kogito-serverless-operator/blob/main/utils/client.go#L25
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]