ricardozanini commented on code in PR #467:
URL:
https://github.com/apache/incubator-kie-kogito-serverless-operator/pull/467#discussion_r1624932800
##########
controllers/knative/knative.go:
##########
@@ -64,8 +81,23 @@ func NewKnativeEventingClient(cfg *rest.Config)
(*clienteventingv1.EventingV1Cli
return clienteventingv1.NewForConfig(cfg)
}
+func GetDisvoveryClient(cfg *rest.Config) (discovery.DiscoveryInterface,
error) {
+ if discoveryClient == nil {
+ if cli, err := discovery.NewDiscoveryClientForConfig(cfg); err
!= nil {
+ return nil, err
+ } else {
+ discoveryClient = cli
+ }
+ }
+ return discoveryClient, nil
+}
+
+func SetDisvoveryClient(cli discovery.DiscoveryInterface) {
Review Comment:
```suggestion
func SetDiscoveryClient(cli discovery.DiscoveryInterface) {
```
##########
controllers/knative/knative.go:
##########
@@ -84,3 +116,95 @@ func GetKnativeAvailability(cfg *rest.Config)
(*Availability, error) {
return result, nil
}
}
+
+func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl
*operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) {
+ if workflow == nil {
+ return nil, nil
+ }
+ if workflow.Spec.Sink != nil {
+ return workflow.Spec.Sink, nil
+ }
+ if pl != nil && pl.Spec.Eventing != nil {
+ // no sink defined in the workflow, use the platform broker
+ return pl.Spec.Eventing.Broker, nil
+ } else if pl.Status.ClusterPlatformRef != nil {
+ // Find the platform referred by the cluster platform
+ platform := &operatorapi.SonataFlowPlatform{}
+ if err := utils.GetClient().Get(context.TODO(),
types.NamespacedName{Namespace:
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name:
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+ return nil, fmt.Errorf("error reading the platform
referred by the cluster platform")
+ }
+ if platform.Spec.Eventing != nil {
+ return platform.Spec.Eventing.Broker, nil
+ }
+ }
+ return nil, nil
+}
+
+func IsKnativeBroker(kRef *duckv1.KReference) bool {
+ return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind ==
"Broker"
Review Comment:
can you please move these to private `const`s?
##########
controllers/profiles/dev/states_dev.go:
##########
@@ -117,7 +117,7 @@ func (e *ensureRunningWorkflowState) Do(ctx
context.Context, workflow *operatora
}
objs = append(objs, route)
- if knativeObjs, err :=
common.NewKnativeEventingHandler(e.StateSupport).Ensure(ctx, workflow); err !=
nil {
+ if knativeObjs, err := common.NewKnativeEventingHandler(e.StateSupport,
pl).Ensure(ctx, workflow); err != nil {
Review Comment:
Do you need to pass `StateSuport` here?
##########
controllers/knative/knative.go:
##########
@@ -64,8 +81,23 @@ func NewKnativeEventingClient(cfg *rest.Config)
(*clienteventingv1.EventingV1Cli
return clienteventingv1.NewForConfig(cfg)
}
+func GetDisvoveryClient(cfg *rest.Config) (discovery.DiscoveryInterface,
error) {
Review Comment:
I'm not sure if this is the right place for this function since it's not
related directly to Knative. We have two options: either move it to the
`utils/kubernetes` package or make it private if only Knative is using it. But
IIRC, our service discovery use cases also use it.
##########
api/condition_types.go:
##########
@@ -39,6 +39,8 @@ const (
SucceedConditionType ConditionType = "Succeed"
// BuiltConditionType describes the condition of a resource that needs
to be build.
BuiltConditionType ConditionType = "Built"
+ // DeployedConditionType describes the condition of a resource that
needs to be deployed.
+ DeployedConditionType ConditionType = "Deployed"
Review Comment:
I'd avoid adding one more condition to the status since it requires a whole
new status handling in the controller reconciliation algorithm.
We do the reconciliation based on the status in the `CanReconcile`. By
adding an additional condition type we will have then to consider it in every
cycle to avoid missing a combination of status and not doing a reconciliation
whatsoever for a given object.
##########
controllers/knative/knative.go:
##########
@@ -84,3 +116,95 @@ func GetKnativeAvailability(cfg *rest.Config)
(*Availability, error) {
return result, nil
}
}
+
+func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl
*operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) {
+ if workflow == nil {
+ return nil, nil
+ }
+ if workflow.Spec.Sink != nil {
+ return workflow.Spec.Sink, nil
+ }
+ if pl != nil && pl.Spec.Eventing != nil {
+ // no sink defined in the workflow, use the platform broker
+ return pl.Spec.Eventing.Broker, nil
+ } else if pl.Status.ClusterPlatformRef != nil {
+ // Find the platform referred by the cluster platform
+ platform := &operatorapi.SonataFlowPlatform{}
+ if err := utils.GetClient().Get(context.TODO(),
types.NamespacedName{Namespace:
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name:
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+ return nil, fmt.Errorf("error reading the platform
referred by the cluster platform")
+ }
+ if platform.Spec.Eventing != nil {
+ return platform.Spec.Eventing.Broker, nil
+ }
+ }
+ return nil, nil
+}
+
+func IsKnativeBroker(kRef *duckv1.KReference) bool {
+ return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind ==
"Broker"
+}
+
+func SaveKnativeData(dest *corev1.PodSpec, source *corev1.PodSpec) {
+ for _, volume := range source.Volumes {
+ if volume.Name == KnativeBundleVolume {
+ kubeutil.AddOrReplaceVolume(dest, volume)
+ break
+ }
+ }
+ visitContainers(source, func(container *corev1.Container) {
+ visitContainers(dest, func(destContainer *corev1.Container) {
+ for _, mount := range container.VolumeMounts {
+ if mount.Name == KnativeBundleVolume {
+
kubeutil.AddOrReplaceVolumeMount(destContainer, mount)
+ break
+ }
+ }
+ for _, env := range container.Env {
+ if env.Name == KSink || env.Name ==
KCeOverRides {
+
kubeutil.AddOrReplaceEnvVar(destContainer, env)
+ }
+ }
+ })
+ })
+}
+
+func moveKnativeVolumeToEnd(vols []corev1.Volume) {
+ for i := 0; i < len(vols)-1; i++ {
+ if vols[i].Name == KnativeBundleVolume {
+ vols[i], vols[i+1] = vols[i+1], vols[i]
+ }
+ }
+}
+
+func moveKnativeVolumeMountToEnd(mounts []corev1.VolumeMount) {
+ for i := 0; i < len(mounts)-1; i++ {
+ if mounts[i].Name == KnativeBundleVolume {
+ mounts[i], mounts[i+1] = mounts[i+1], mounts[i]
+ }
+ }
+}
+
+// Knative Sinkbinding injects K_SINK env, a volume and volumn mount. The
volume and volume mount
Review Comment:
```suggestion
// Knative Sinkbinding injects K_SINK env, a volume and volume mount. The
volume and volume mount
```
##########
controllers/knative/knative.go:
##########
@@ -64,8 +81,23 @@ func NewKnativeEventingClient(cfg *rest.Config)
(*clienteventingv1.EventingV1Cli
return clienteventingv1.NewForConfig(cfg)
}
+func GetDisvoveryClient(cfg *rest.Config) (discovery.DiscoveryInterface,
error) {
+ if discoveryClient == nil {
+ if cli, err := discovery.NewDiscoveryClientForConfig(cfg); err
!= nil {
+ return nil, err
+ } else {
+ discoveryClient = cli
+ }
+ }
+ return discoveryClient, nil
+}
+
+func SetDisvoveryClient(cli discovery.DiscoveryInterface) {
+ discoveryClient = cli
+}
+
func GetKnativeAvailability(cfg *rest.Config) (*Availability, error) {
- if cli, err := discovery.NewDiscoveryClientForConfig(cfg); err != nil {
+ if cli, err := GetDisvoveryClient(cfg); err != nil {
Review Comment:
```suggestion
if cli, err := GetDiscoveryClient(cfg); err != nil {
```
##########
controllers/knative/knative.go:
##########
@@ -20,20 +20,37 @@
package knative
import (
+ "context"
+ "fmt"
+
+ operatorapi
"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
+ "github.com/apache/incubator-kie-kogito-serverless-operator/utils"
+ kubeutil
"github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
+ appsv1 "k8s.io/api/apps/v1"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
clienteventingv1
"knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
+ duckv1 "knative.dev/pkg/apis/duck/v1"
clientservingv1
"knative.dev/serving/pkg/client/clientset/versioned/typed/serving/v1"
)
var servingClient clientservingv1.ServingV1Interface
var eventingClient clienteventingv1.EventingV1Interface
+var discoveryClient discovery.DiscoveryInterface
type Availability struct {
Eventing bool
Serving bool
}
+const (
+ KSink = "K_SINK"
+ KnativeBundleVolume = "kne-bundle-volume"
+ KCeOverRides = "K_CE_OVERRIDES"
+)
Review Comment:
Are we using this publicly somewhere else?
##########
controllers/profiles/common/constants/platform_services.go:
##########
@@ -20,21 +20,29 @@
package constants
const (
- QuarkusHTTP = "quarkus-http"
-
+ QuarkusHTTP = "quarkus-http"
+ Post = "POST"
ConfigMapWorkflowPropsVolumeName = "workflow-properties"
- JobServiceRequestEventsURL =
"mp.messaging.outgoing.kogito-job-service-job-request-events.url"
- JobServiceRequestEventsConnector =
"mp.messaging.outgoing.kogito-job-service-job-request-events.connector"
- JobServiceStatusChangeEvents =
"kogito.jobs-service.http.job-status-change-events"
- JobServiceStatusChangeEventsURL =
"mp.messaging.outgoing.kogito-job-service-job-status-events-http.url"
- JobServiceURLProtocol = "http"
- JobServiceDataSourceReactiveURL = "quarkus.datasource.reactive.url"
- JobServiceJobEventsPath = "/v2/jobs/events"
+ JobServiceRequestEventsURL =
"mp.messaging.outgoing.kogito-job-service-job-request-events.url"
+ JobServiceRequestEventsMethod =
"mp.messaging.outgoing.kogito-job-service-job-request-events.method"
+ JobServiceRequestEventsConnector =
"mp.messaging.outgoing.kogito-job-service-job-request-events.connector"
+ JobServiceStatusChangeEvents =
"kogito.jobs-service.http.job-status-change-events"
+ JobServiceStatusChangeEventsURL =
"mp.messaging.outgoing.kogito-job-service-job-status-events-http.url"
+ JobServiceStatusChangeEventsConnector =
"mp.messaging.outgoing.kogito-job-service-job-status-events-http.connector"
+ JobServiceStatusChangeEventsMethod =
"mp.messaging.outgoing.kogito-job-service-job-status-events-http.method"
+ JobServiceURLProtocol = "http"
+ JobServiceDataSourceReactiveURL =
"quarkus.datasource.reactive.url"
+ JobServiceJobEventsPath = "/v2/jobs/events"
Review Comment:
You have this value being used in the file above.
##########
controllers/platform/services/knative.go:
##########
@@ -0,0 +1,48 @@
+// Copyright 2024 Apache Software Foundation (ASF)
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package services
Review Comment:
Can you please move this functions to `services.go`? I don't see Knative
related code in these functions.
##########
controllers/knative/knative.go:
##########
@@ -84,3 +116,95 @@ func GetKnativeAvailability(cfg *rest.Config)
(*Availability, error) {
return result, nil
}
}
+
+func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl
*operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) {
+ if workflow == nil {
+ return nil, nil
+ }
+ if workflow.Spec.Sink != nil {
+ return workflow.Spec.Sink, nil
+ }
+ if pl != nil && pl.Spec.Eventing != nil {
+ // no sink defined in the workflow, use the platform broker
+ return pl.Spec.Eventing.Broker, nil
+ } else if pl.Status.ClusterPlatformRef != nil {
+ // Find the platform referred by the cluster platform
+ platform := &operatorapi.SonataFlowPlatform{}
+ if err := utils.GetClient().Get(context.TODO(),
types.NamespacedName{Namespace:
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name:
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+ return nil, fmt.Errorf("error reading the platform
referred by the cluster platform")
+ }
+ if platform.Spec.Eventing != nil {
+ return platform.Spec.Eventing.Broker, nil
+ }
+ }
+ return nil, nil
+}
+
+func IsKnativeBroker(kRef *duckv1.KReference) bool {
+ return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind ==
"Broker"
+}
+
+func SaveKnativeData(dest *corev1.PodSpec, source *corev1.PodSpec) {
+ for _, volume := range source.Volumes {
+ if volume.Name == KnativeBundleVolume {
+ kubeutil.AddOrReplaceVolume(dest, volume)
+ break
+ }
+ }
+ visitContainers(source, func(container *corev1.Container) {
+ visitContainers(dest, func(destContainer *corev1.Container) {
+ for _, mount := range container.VolumeMounts {
+ if mount.Name == KnativeBundleVolume {
+
kubeutil.AddOrReplaceVolumeMount(destContainer, mount)
+ break
+ }
+ }
+ for _, env := range container.Env {
+ if env.Name == KSink || env.Name ==
KCeOverRides {
+
kubeutil.AddOrReplaceEnvVar(destContainer, env)
+ }
+ }
+ })
+ })
+}
+
+func moveKnativeVolumeToEnd(vols []corev1.Volume) {
+ for i := 0; i < len(vols)-1; i++ {
+ if vols[i].Name == KnativeBundleVolume {
+ vols[i], vols[i+1] = vols[i+1], vols[i]
+ }
+ }
+}
+
+func moveKnativeVolumeMountToEnd(mounts []corev1.VolumeMount) {
+ for i := 0; i < len(mounts)-1; i++ {
+ if mounts[i].Name == KnativeBundleVolume {
+ mounts[i], mounts[i+1] = mounts[i+1], mounts[i]
+ }
+ }
+}
+
+// Knative Sinkbinding injects K_SINK env, a volume and volumn mount. The
volume and volume mount
+// must be in the end of the array to avoid repeadly restarting of the
workflow pod
+func RestoreKnativeVolumeAndVolumeMount(deployment *appsv1.Deployment) {
+ moveKnativeVolumeToEnd(deployment.Spec.Template.Spec.Volumes)
+ visitContainers(&deployment.Spec.Template.Spec, func(container
*corev1.Container) {
+ moveKnativeVolumeMountToEnd(container.VolumeMounts)
+ })
+}
+
+// ContainerVisitor is called with each container
+type ContainerVisitor func(container *corev1.Container)
Review Comment:
Can you make this type private since it seems to be used only by
`visitContainers`? Nice implementation btw! 👍
##########
controllers/platform/services/services.go:
##########
@@ -505,3 +552,167 @@ func mergeContainerPreservingEnvVars(dest
*corev1.Container, source *corev1.Cont
}
return nil
}
+
+// GetPlatformBroker gets the default broker for the platform.
+func GetPlatformBroker(platform *operatorapi.SonataFlowPlatform)
*duckv1.Destination {
+ if platform != nil && platform.Spec.Eventing != nil &&
platform.Spec.Eventing.Broker != nil {
+ return platform.Spec.Eventing.Broker
+ }
+ return nil
+}
+
+func (d *DataIndexHandler) GetSourceBroker() *duckv1.Destination {
+ if d.platform != nil && d.platform.Spec.Services.DataIndex.Source !=
nil && d.platform.Spec.Services.DataIndex.Source.Ref != nil {
+ return d.platform.Spec.Services.DataIndex.Source
+ }
+ return GetPlatformBroker(d.platform)
+}
+
+func (d *DataIndexHandler) newTrigger(labels map[string]string, brokerName,
namespace, platformName, serviceName, tag, eventType, path string)
*eventingv1.Trigger {
+ return &eventingv1.Trigger{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: fmt.Sprintf("%s-data-index-%s-trigger",
platformName, tag),
+ Namespace: namespace,
+ Labels: labels,
+ },
+ Spec: eventingv1.TriggerSpec{
+ Broker: brokerName,
+ Filter: &eventingv1.TriggerFilter{
+ Attributes: eventingv1.TriggerFilterAttributes{
+ "type": eventType,
+ },
+ },
+ Subscriber: duckv1.Destination{
+ Ref: &duckv1.KReference{
+ Name: serviceName,
+ Namespace: namespace,
+ APIVersion: "v1",
+ Kind: "Service",
+ },
+ URI: &apis.URL{
+ Path: path,
+ },
+ },
+ },
+ }
+}
+func (d *DataIndexHandler) GenerateKnativeResources(platform
*operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object,
error) {
+ broker := d.GetSourceBroker()
+ if broker == nil {
+ return nil, nil // Nothing to do
+ }
+ brokerName := broker.Ref.Name
+ namespace := platform.Namespace
+ serviceName := d.GetServiceName()
+ return []client.Object{
+ d.newTrigger(lbl, brokerName, namespace, platform.Name,
serviceName, "process-error", "ProcessInstanceErrorDataEvent", pathProcesses),
+ d.newTrigger(lbl, brokerName, namespace, platform.Name,
serviceName, "process-node", "ProcessInstanceNodeDataEvent", pathProcesses),
+ d.newTrigger(lbl, brokerName, namespace, platform.Name,
serviceName, "process-sla", "ProcessInstanceSLADataEvent", pathProcesses),
+ d.newTrigger(lbl, brokerName, namespace, platform.Name,
serviceName, "process-state", "ProcessInstanceStateDataEvent", pathProcesses),
+ d.newTrigger(lbl, brokerName, namespace, platform.Name,
serviceName, "process-variable", "ProcessInstanceVariableDataEvent",
pathProcesses),
+ d.newTrigger(lbl, brokerName, namespace, platform.Name,
serviceName, "process-definition", "ProcessDefinitionEvent", pathDefinitions),
+ d.newTrigger(lbl, brokerName, namespace, platform.Name,
serviceName, "jobs", "JobEvent", pathJobs)}, nil
+}
+
+func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
+ if d.platform.Spec.Services.JobService.Source != nil &&
d.platform.Spec.Services.JobService.Source.Ref != nil {
+ return d.platform.Spec.Services.JobService.Source
+ }
+ return GetPlatformBroker(d.platform)
+}
+
+func (d JobServiceHandler) GetSink() *duckv1.Destination {
+ if d.platform.Spec.Services.JobService.Sink != nil {
+ return d.platform.Spec.Services.JobService.Sink
+ }
+ return GetPlatformBroker(d.platform)
+}
+
+func (j *JobServiceHandler) GenerateKnativeResources(platform
*operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object,
error) {
+ broker := j.GetSourceBroker()
+ sink := j.GetSink()
+ namespace := platform.Namespace
+ resultObjs := []client.Object{}
+
+ if broker != nil {
+ brokerName := broker.Ref.Name
+ jobCreateTrigger := &eventingv1.Trigger{
+ ObjectMeta: metav1.ObjectMeta{
+ Name:
fmt.Sprintf("%s-jobs-service-create-job-trigger", platform.Name),
+ Namespace: namespace,
+ Labels: lbl,
+ },
+ Spec: eventingv1.TriggerSpec{
+ Broker: brokerName,
+ Filter: &eventingv1.TriggerFilter{
+ Attributes:
eventingv1.TriggerFilterAttributes{
+ "type": "job.create",
+ },
+ },
+ Subscriber: duckv1.Destination{
+ Ref: &duckv1.KReference{
+ Name: j.GetServiceName(),
+ Namespace: namespace,
+ APIVersion: "v1",
+ Kind: "Service",
+ },
+ URI: &apis.URL{
+ Path: "/v2/jobs/events",
Review Comment:
Can you move this to a private const?
##########
controllers/knative/knative.go:
##########
@@ -84,3 +116,95 @@ func GetKnativeAvailability(cfg *rest.Config)
(*Availability, error) {
return result, nil
}
}
+
+func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl
*operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) {
+ if workflow == nil {
+ return nil, nil
+ }
+ if workflow.Spec.Sink != nil {
+ return workflow.Spec.Sink, nil
+ }
+ if pl != nil && pl.Spec.Eventing != nil {
+ // no sink defined in the workflow, use the platform broker
+ return pl.Spec.Eventing.Broker, nil
+ } else if pl.Status.ClusterPlatformRef != nil {
+ // Find the platform referred by the cluster platform
+ platform := &operatorapi.SonataFlowPlatform{}
+ if err := utils.GetClient().Get(context.TODO(),
types.NamespacedName{Namespace:
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name:
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+ return nil, fmt.Errorf("error reading the platform
referred by the cluster platform")
Review Comment:
Don't we have a function doing this in the `controllers/platform` module? If
not, I think we can add it there.
##########
controllers/profiles/common/constants/platform_services.go:
##########
@@ -20,21 +20,29 @@
package constants
const (
- QuarkusHTTP = "quarkus-http"
-
+ QuarkusHTTP = "quarkus-http"
+ Post = "POST"
ConfigMapWorkflowPropsVolumeName = "workflow-properties"
- JobServiceRequestEventsURL =
"mp.messaging.outgoing.kogito-job-service-job-request-events.url"
- JobServiceRequestEventsConnector =
"mp.messaging.outgoing.kogito-job-service-job-request-events.connector"
- JobServiceStatusChangeEvents =
"kogito.jobs-service.http.job-status-change-events"
- JobServiceStatusChangeEventsURL =
"mp.messaging.outgoing.kogito-job-service-job-status-events-http.url"
- JobServiceURLProtocol = "http"
- JobServiceDataSourceReactiveURL = "quarkus.datasource.reactive.url"
- JobServiceJobEventsPath = "/v2/jobs/events"
+ JobServiceRequestEventsURL =
"mp.messaging.outgoing.kogito-job-service-job-request-events.url"
+ JobServiceRequestEventsMethod =
"mp.messaging.outgoing.kogito-job-service-job-request-events.method"
+ JobServiceRequestEventsConnector =
"mp.messaging.outgoing.kogito-job-service-job-request-events.connector"
+ JobServiceStatusChangeEvents =
"kogito.jobs-service.http.job-status-change-events"
+ JobServiceStatusChangeEventsURL =
"mp.messaging.outgoing.kogito-job-service-job-status-events-http.url"
+ JobServiceStatusChangeEventsConnector =
"mp.messaging.outgoing.kogito-job-service-job-status-events-http.connector"
+ JobServiceStatusChangeEventsMethod =
"mp.messaging.outgoing.kogito-job-service-job-status-events-http.method"
+ JobServiceURLProtocol = "http"
+ JobServiceDataSourceReactiveURL =
"quarkus.datasource.reactive.url"
+ JobServiceJobEventsPath = "/v2/jobs/events"
+ KogitoProcessEventsProtocol = "http"
Review Comment:
Why not rename it to `DefaultHTTPProtocol` and have one const only? This
seems the same as `JobServiceURLProtocol`.
##########
controllers/profiles/dev/object_creators_dev.go:
##########
@@ -95,7 +96,7 @@ func deploymentMutateVisitor(workflow
*operatorapi.SonataFlow, plf *operatorapi.
}
}
-func ensureWorkflowDefConfigMapMutator(workflow *operatorapi.SonataFlow)
common.MutateVisitor {
+func ensureWorkflowDefConfigMapMutator(workflow *operatorapi.SonataFlow,
support *common.StateSupport) common.MutateVisitor {
Review Comment:
```suggestion
func ensureWorkflowDefConfigMapMutator(workflow *operatorapi.SonataFlow)
common.MutateVisitor {
```
I think we don't need this attribute anymore.
##########
controllers/profiles/common/ensurer.go:
##########
@@ -176,6 +224,11 @@ func ensureObject(ctx context.Context, workflow
*operatorapi.SonataFlow, visitor
return visitorErr
}
}
+ if workflow.Namespace != object.GetNamespace() {
+ // This is for Knative trigger in a different
namespace
+ // Set the finalizer for trigger cleanup when
the workflow is deleted
+ return setWorkflowFinalizer(ctx, c, workflow)
Review Comment:
Can you please do another check to verify if this object is a knative one?
##########
controllers/profiles/dev/profile_dev_test.go:
##########
@@ -391,6 +404,7 @@ func Test_VolumeWithCapitalizedPaths(t *testing.T) {
workflow :=
test.GetSonataFlow(test.SonataFlowGreetingsWithStaticResourcesCR, t.Name())
client :=
test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow,
configMap).WithStatusSubresource(workflow, configMap).Build()
+ knative.SetDisvoveryClient(test.CreateFakeKnativeDiscoveryClient())
Review Comment:
This naming will be replaced once you accept my changes.
`SetDiscoveryClient`.
##########
controllers/profiles/common/mutate_visitors.go:
##########
@@ -134,7 +138,7 @@ func EnsureKService(original *servingv1.Service, object
*servingv1.Service) erro
}
// we do a merge to not keep changing the spec since k8s will set
default values to the podSpec
- return mergo.Merge(&object.Spec.Template.Spec.PodSpec,
original.Spec.Template.Spec.PodSpec, mergo.WithOverride)
+ return mergo.Merge(&object.Spec.Template.Spec.PodSpec,
original.Spec.Template.Spec.PodSpec /*, mergo.WithOverride*/)
Review Comment:
Can you remove this comment? I'm assuming this is not required anymore.
##########
controllers/knative/knative.go:
##########
@@ -64,8 +81,23 @@ func NewKnativeEventingClient(cfg *rest.Config)
(*clienteventingv1.EventingV1Cli
return clienteventingv1.NewForConfig(cfg)
}
+func GetDisvoveryClient(cfg *rest.Config) (discovery.DiscoveryInterface,
error) {
Review Comment:
```suggestion
func GetDiscoveryClient(cfg *rest.Config) (discovery.DiscoveryInterface,
error) {
```
##########
controllers/profiles/dev/states_dev.go:
##########
@@ -62,7 +62,7 @@ func (e *ensureRunningWorkflowState) CanReconcile(workflow
*operatorapi.SonataFl
func (e *ensureRunningWorkflowState) Do(ctx context.Context, workflow
*operatorapi.SonataFlow) (ctrl.Result, []client.Object, error) {
var objs []client.Object
- flowDefCM, _, err := e.ensurers.definitionConfigMap.Ensure(ctx,
workflow, ensureWorkflowDefConfigMapMutator(workflow))
+ flowDefCM, _, err := e.ensurers.definitionConfigMap.Ensure(ctx,
workflow, ensureWorkflowDefConfigMapMutator(workflow, e.StateSupport))
Review Comment:
```suggestion
flowDefCM, _, err := e.ensurers.definitionConfigMap.Ensure(ctx,
workflow, ensureWorkflowDefConfigMapMutator(workflow))
```
##########
controllers/profiles/preview/states_preview.go:
##########
@@ -210,7 +211,15 @@ func (h *deployWithBuildWorkflowState) Do(ctx
context.Context, workflow *operato
}
// didn't change, business as usual
- return NewDeploymentReconciler(h.StateSupport,
h.ensurers).reconcileWithImage(ctx, workflow, build.Status.ImageTag)
+ result, objs, err := NewDeploymentReconciler(h.StateSupport,
h.ensurers).reconcileWithImage(ctx, workflow, build.Status.ImageTag)
+ if err != nil {
+ workflow.Status.Manager().MarkFalse(api.DeployedConditionType,
api.DeploymentFailureReason, fmt.Sprintf("Error in deploy the workflow:%s",
err))
+ _, err = h.PerformStatusUpdate(ctx, workflow)
+ return result, nil, err
+ }
+ workflow.Status.Manager().MarkTrue(api.DeployedConditionType)
Review Comment:
So this is the only place we set this condition to `true`. Can't we keep
using `Running`? Otherwise, if we were to introduce this new status, we need to
make sure that we set it accordingly in every state.
| State | Status | Description
|------|--------|------------|
|Running | TRUE | The workflow is running
|Deployed | TRUE | The workflow has been deployed
|Built | FALSE | The workflow has not being built
So, we need to justify and think about this matrix. If it's not deployed,
hence not running, right? That's what I meant by introducing additional
complexity to the algorithm by adding a new condition.
##########
controllers/profiles/common/mutate_visitors.go:
##########
@@ -96,8 +96,12 @@ func EnsureDeployment(original *appsv1.Deployment, object
*appsv1.Deployment) er
object.Spec.Replicas = original.Spec.Replicas
object.Spec.Selector = original.Spec.Selector
object.Labels = original.GetLabels()
+ object.Finalizers = original.Finalizers
// Clean up the volumes, they are inherited from original, additional
are added by other visitors
+ // However, the knative data (voulmes, volumes mounts) must be preserved
Review Comment:
```suggestion
// However, the knative data (volumes, volumes mounts) must be preserved
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]