jianrongzhang89 commented on code in PR #467:
URL:
https://github.com/apache/incubator-kie-kogito-serverless-operator/pull/467#discussion_r1627707505
##########
controllers/knative/knative.go:
##########
@@ -84,3 +116,95 @@ func GetKnativeAvailability(cfg *rest.Config)
(*Availability, error) {
return result, nil
}
}
+
+func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl
*operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) {
+ if workflow == nil {
+ return nil, nil
+ }
+ if workflow.Spec.Sink != nil {
+ return workflow.Spec.Sink, nil
+ }
+ if pl != nil && pl.Spec.Eventing != nil {
+ // no sink defined in the workflow, use the platform broker
+ return pl.Spec.Eventing.Broker, nil
+ } else if pl.Status.ClusterPlatformRef != nil {
+ // Find the platform referred by the cluster platform
+ platform := &operatorapi.SonataFlowPlatform{}
+ if err := utils.GetClient().Get(context.TODO(),
types.NamespacedName{Namespace:
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name:
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+ return nil, fmt.Errorf("error reading the platform
referred by the cluster platform")
+ }
+ if platform.Spec.Eventing != nil {
+ return platform.Spec.Eventing.Broker, nil
+ }
+ }
+ return nil, nil
+}
+
+func IsKnativeBroker(kRef *duckv1.KReference) bool {
+ return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind ==
"Broker"
+}
+
+func SaveKnativeData(dest *corev1.PodSpec, source *corev1.PodSpec) {
+ for _, volume := range source.Volumes {
+ if volume.Name == KnativeBundleVolume {
+ kubeutil.AddOrReplaceVolume(dest, volume)
+ break
+ }
+ }
+ visitContainers(source, func(container *corev1.Container) {
+ visitContainers(dest, func(destContainer *corev1.Container) {
+ for _, mount := range container.VolumeMounts {
+ if mount.Name == KnativeBundleVolume {
+
kubeutil.AddOrReplaceVolumeMount(destContainer, mount)
+ break
+ }
+ }
+ for _, env := range container.Env {
+ if env.Name == KSink || env.Name ==
KCeOverRides {
+
kubeutil.AddOrReplaceEnvVar(destContainer, env)
+ }
+ }
+ })
+ })
+}
+
+func moveKnativeVolumeToEnd(vols []corev1.Volume) {
+ for i := 0; i < len(vols)-1; i++ {
+ if vols[i].Name == KnativeBundleVolume {
+ vols[i], vols[i+1] = vols[i+1], vols[i]
+ }
+ }
+}
+
+func moveKnativeVolumeMountToEnd(mounts []corev1.VolumeMount) {
+ for i := 0; i < len(mounts)-1; i++ {
+ if mounts[i].Name == KnativeBundleVolume {
+ mounts[i], mounts[i+1] = mounts[i+1], mounts[i]
+ }
+ }
+}
+
+// Knative Sinkbinding injects K_SINK env, a volume and volumn mount. The
volume and volume mount
Review Comment:
Corrected
##########
controllers/knative/knative.go:
##########
@@ -84,3 +116,95 @@ func GetKnativeAvailability(cfg *rest.Config)
(*Availability, error) {
return result, nil
}
}
+
+func GetWorkflowSink(workflow *operatorapi.SonataFlow, pl
*operatorapi.SonataFlowPlatform) (*duckv1.Destination, error) {
+ if workflow == nil {
+ return nil, nil
+ }
+ if workflow.Spec.Sink != nil {
+ return workflow.Spec.Sink, nil
+ }
+ if pl != nil && pl.Spec.Eventing != nil {
+ // no sink defined in the workflow, use the platform broker
+ return pl.Spec.Eventing.Broker, nil
+ } else if pl.Status.ClusterPlatformRef != nil {
+ // Find the platform referred by the cluster platform
+ platform := &operatorapi.SonataFlowPlatform{}
+ if err := utils.GetClient().Get(context.TODO(),
types.NamespacedName{Namespace:
pl.Status.ClusterPlatformRef.PlatformRef.Namespace, Name:
pl.Status.ClusterPlatformRef.PlatformRef.Name}, platform); err != nil {
+ return nil, fmt.Errorf("error reading the platform
referred by the cluster platform")
+ }
+ if platform.Spec.Eventing != nil {
+ return platform.Spec.Eventing.Broker, nil
+ }
+ }
+ return nil, nil
+}
+
+func IsKnativeBroker(kRef *duckv1.KReference) bool {
+ return kRef.APIVersion == "eventing.knative.dev/v1" && kRef.Kind ==
"Broker"
+}
+
+func SaveKnativeData(dest *corev1.PodSpec, source *corev1.PodSpec) {
+ for _, volume := range source.Volumes {
+ if volume.Name == KnativeBundleVolume {
+ kubeutil.AddOrReplaceVolume(dest, volume)
+ break
+ }
+ }
+ visitContainers(source, func(container *corev1.Container) {
+ visitContainers(dest, func(destContainer *corev1.Container) {
+ for _, mount := range container.VolumeMounts {
+ if mount.Name == KnativeBundleVolume {
+
kubeutil.AddOrReplaceVolumeMount(destContainer, mount)
+ break
+ }
+ }
+ for _, env := range container.Env {
+ if env.Name == KSink || env.Name ==
KCeOverRides {
+
kubeutil.AddOrReplaceEnvVar(destContainer, env)
+ }
+ }
+ })
+ })
+}
+
+func moveKnativeVolumeToEnd(vols []corev1.Volume) {
+ for i := 0; i < len(vols)-1; i++ {
+ if vols[i].Name == KnativeBundleVolume {
+ vols[i], vols[i+1] = vols[i+1], vols[i]
+ }
+ }
+}
+
+func moveKnativeVolumeMountToEnd(mounts []corev1.VolumeMount) {
+ for i := 0; i < len(mounts)-1; i++ {
+ if mounts[i].Name == KnativeBundleVolume {
+ mounts[i], mounts[i+1] = mounts[i+1], mounts[i]
+ }
+ }
+}
+
+// Knative Sinkbinding injects K_SINK env, a volume and volumn mount. The
volume and volume mount
+// must be in the end of the array to avoid repeadly restarting of the
workflow pod
+func RestoreKnativeVolumeAndVolumeMount(deployment *appsv1.Deployment) {
+ moveKnativeVolumeToEnd(deployment.Spec.Template.Spec.Volumes)
+ visitContainers(&deployment.Spec.Template.Spec, func(container
*corev1.Container) {
+ moveKnativeVolumeMountToEnd(container.VolumeMounts)
+ })
+}
+
+// ContainerVisitor is called with each container
+type ContainerVisitor func(container *corev1.Container)
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]