This is an automated email from the ASF dual-hosted git repository.
wmedvedeo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-tools.git
The following commit(s) were added to refs/heads/main by this push:
new 25be0205dce kie-tools-2910: Update workflow metadata upon deployment
events (#3030)
25be0205dce is described below
commit 25be0205dceccc4dbbf2490ec613d9d2fe8783cb
Author: Walter Medvedeo <[email protected]>
AuthorDate: Wed Mar 26 16:10:42 2025 +0100
kie-tools-2910: Update workflow metadata upon deployment events (#3030)
---
packages/sonataflow-operator/cmd/main.go | 9 +++++
.../controller/eventing/workflowdef_events.go | 2 +-
.../internal/controller/knative/knative.go | 45 ++++++++++++++++++++++
packages/sonataflow-operator/utils/client.go | 13 +++++++
4 files changed, 68 insertions(+), 1 deletion(-)
diff --git a/packages/sonataflow-operator/cmd/main.go
b/packages/sonataflow-operator/cmd/main.go
index 5a524863c3e..11c6b169c40 100644
--- a/packages/sonataflow-operator/cmd/main.go
+++ b/packages/sonataflow-operator/cmd/main.go
@@ -22,8 +22,11 @@ package main
import (
"crypto/tls"
"flag"
+ "fmt"
"os"
+ "k8s.io/client-go/dynamic"
+
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/manager"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/version"
@@ -149,6 +152,12 @@ func main() {
// Set global assessors
utils.SetIsOpenShift(mgr.GetConfig())
utils.SetClient(mgr.GetClient())
+ cli, err := dynamic.NewForConfig(mgr.GetConfig())
+ if err != nil {
+ // shouldn't fail, since config is provided by the cluster, if
fails, SetIsOpenShift should probably fail before.
+ panic(fmt.Sprintf("Impossible to get new dynamic client for
config to support controller operations: %s", err))
+ }
+ utils.SetDynamicClient(cli)
// Fail fast, we can change this behavior in the future to read from
defaults instead.
if _, err = cfg.InitializeControllersCfgAt(controllerCfgPath); err !=
nil {
diff --git
a/packages/sonataflow-operator/internal/controller/eventing/workflowdef_events.go
b/packages/sonataflow-operator/internal/controller/eventing/workflowdef_events.go
index 73a1651b541..8dad2e93cb7 100644
---
a/packages/sonataflow-operator/internal/controller/eventing/workflowdef_events.go
+++
b/packages/sonataflow-operator/internal/controller/eventing/workflowdef_events.go
@@ -60,7 +60,7 @@ func GetWorkflowDefinitionEventsTargetURL(cli client.Client,
workflow *operatora
}
if sink != nil {
// Workflow is connected via with knative eventing by using an
operator managed SinkBinding.
- if sinkURI, err :=
knative.GetSinkBindingSinkURI(workflow.Name+"-sb", workflow.Namespace); err !=
nil {
+ if sinkURI, err := knative.GetSinkURI(*sink); err != nil {
return "", err
} else {
uri = sinkURI.String()
diff --git
a/packages/sonataflow-operator/internal/controller/knative/knative.go
b/packages/sonataflow-operator/internal/controller/knative/knative.go
index 3886363b696..1e004c9330b 100644
--- a/packages/sonataflow-operator/internal/controller/knative/knative.go
+++ b/packages/sonataflow-operator/internal/controller/knative/knative.go
@@ -24,6 +24,14 @@ import (
"fmt"
"strings"
+ "knative.dev/pkg/resolver"
+
+ "knative.dev/pkg/tracker"
+
+ "knative.dev/pkg/injection/clients/dynamicclient"
+
+ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable"
+
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
@@ -65,6 +73,31 @@ const (
KafkaKnativeEventingDeliveryOrderOrdered = "ordered"
)
+// noOpTracker no operations tracker for querying operations based on
resolver.URIResolver, that don't require any
+// resource tracking but only resolving the URL.
+// Note: knative team was asked, and it's valid to use a dummy tracker at the
same time we benefit from the uri resolution.
+// see: resolver.URIResolver
+type noOpTracker struct {
+}
+
+func (n noOpTracker) Track(ref corev1.ObjectReference, obj interface{}) error {
+ return nil
+}
+
+func (n noOpTracker) TrackReference(ref tracker.Reference, obj interface{})
error {
+ return nil
+}
+
+func (n noOpTracker) OnChanged(obj interface{}) {
+}
+
+func (n noOpTracker) GetObservers(obj interface{}) []types.NamespacedName {
+ return nil
+}
+
+func (n noOpTracker) OnDeletedObserver(obj interface{}) {
+}
+
func GetKnativeServingClient(cfg *rest.Config)
(clientservingv1.ServingV1Interface, error) {
if servingClient == nil {
if knServingClient, err := NewKnativeServingClient(cfg); err !=
nil {
@@ -306,3 +339,15 @@ func GetSinkBindingSinkURI(name, namespace string)
(*apis.URL, error) {
}
return sb.Status.SinkURI, nil
}
+
+// GetSinkURI returns the address of the sink referred by a Destination.
+func GetSinkURI(destination duckv1.Destination) (*apis.URL, error) {
+ ctx := context.WithValue(context.TODO(), dynamicclient.Key{},
utils.GetDynamicClient())
+ ctx = addressable.WithDuck(ctx)
+ uriResolver := resolver.NewURIResolverFromTracker(ctx, &noOpTracker{})
+ if url, err := uriResolver.URIFromDestinationV1(ctx, destination, nil);
err != nil {
+ return nil, err
+ } else {
+ return url, nil
+ }
+}
diff --git a/packages/sonataflow-operator/utils/client.go
b/packages/sonataflow-operator/utils/client.go
index 3f7001835c3..1d984c31c59 100644
--- a/packages/sonataflow-operator/utils/client.go
+++ b/packages/sonataflow-operator/utils/client.go
@@ -19,11 +19,13 @@ package utils
import (
"k8s.io/client-go/discovery"
+ "k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)
var k8sClient client.Client
+var k8sDynamicClient *dynamic.DynamicClient
var discoveryClient discovery.DiscoveryInterface
// TODO: consider refactor the internals as we progress adding features to
rely on this client instead of passing it through all the functions
@@ -39,6 +41,17 @@ func SetClient(client client.Client) {
k8sClient = client
}
+// GetDynamicClient default dynamic client created by the main operator's
thread.
+// It's safe to use since it's set when the operator main function runs.
+func GetDynamicClient() *dynamic.DynamicClient {
+ return k8sDynamicClient
+}
+
+// SetDynamicClient is meant for internal use only. Don't call it!
+func SetDynamicClient(cli *dynamic.DynamicClient) {
+ k8sDynamicClient = cli
+}
+
func GetDiscoveryClient(cfg *rest.Config) (discovery.DiscoveryInterface,
error) {
if discoveryClient == nil {
if cli, err := discovery.NewDiscoveryClientForConfig(cfg); err
!= nil {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]