This is an automated email from the ASF dual-hosted git repository.
cdeppisch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/main by this push:
new d558b0d18 fix(#5402): Evaluate Knative profile based on
Serving/Eventing installed
d558b0d18 is described below
commit d558b0d18751fca2963509f8e192ea3d90575bdc
Author: Christoph Deppisch <[email protected]>
AuthorDate: Wed Apr 24 19:34:11 2024 +0200
fix(#5402): Evaluate Knative profile based on Serving/Eventing installed
- Use Knative profile when Serving or Eventing is installed on cluster
- Make sure to enable Knative trait when Serving or Eventing is installed
- Enable knative-service trait only when Knative Serving is installed
- Garbage collect Serving and Eventing resources in gc trait only when
Serving/Eventing is installed on the cluster
- Do not use Serving service in Knative trigger when not installed on
cluster
- Use arbitrary Service as a subscriber in Knative trigger when Serving is
not available
---
pkg/controller/integration/platform_setup.go | 2 +-
pkg/controller/kameletbinding/integration.go | 2 +-
pkg/controller/pipe/integration.go | 2 +-
pkg/trait/gc.go | 45 +++---
pkg/trait/knative.go | 31 +++-
pkg/trait/knative_service.go | 6 +
pkg/trait/knative_service_test.go | 75 ++++++++++
pkg/trait/knative_test.go | 205 +++++++++++++++++++++++++++
pkg/util/knative/enabled.go | 17 ++-
pkg/util/knative/knative.go | 32 +++--
pkg/util/kubernetes/collection.go | 34 ++++-
pkg/util/test/client.go | 87 +++++++-----
12 files changed, 469 insertions(+), 69 deletions(-)
diff --git a/pkg/controller/integration/platform_setup.go
b/pkg/controller/integration/platform_setup.go
index 2c578276d..e1851f769 100644
--- a/pkg/controller/integration/platform_setup.go
+++ b/pkg/controller/integration/platform_setup.go
@@ -97,7 +97,7 @@ func determineBestTraitProfile(c client.Client, integration
*v1.Integration, p *
// Use platform spec profile if set
return p.Spec.Profile, nil
}
- if ok, err := knative.IsServingInstalled(c); err != nil {
+ if ok, err := knative.IsInstalled(c); err != nil {
return "", err
} else if ok {
return v1.TraitProfileKnative, nil
diff --git a/pkg/controller/kameletbinding/integration.go
b/pkg/controller/kameletbinding/integration.go
index 1f4630277..b28780de8 100644
--- a/pkg/controller/kameletbinding/integration.go
+++ b/pkg/controller/kameletbinding/integration.go
@@ -248,7 +248,7 @@ func determineTraitProfile(ctx context.Context, c
client.Client, binding *v1alph
return pl.Spec.Profile, nil
}
}
- if ok, err := knative.IsServingInstalled(c); err != nil {
+ if ok, err := knative.IsInstalled(c); err != nil {
return "", err
} else if ok {
return v1.TraitProfileKnative, nil
diff --git a/pkg/controller/pipe/integration.go
b/pkg/controller/pipe/integration.go
index 80b06d662..aa2a9c4be 100644
--- a/pkg/controller/pipe/integration.go
+++ b/pkg/controller/pipe/integration.go
@@ -248,7 +248,7 @@ func determineTraitProfile(ctx context.Context, c
client.Client, binding *v1.Pip
return pl.Spec.Profile, nil
}
}
- if ok, err := knative.IsServingInstalled(c); err != nil {
+ if ok, err := knative.IsInstalled(c); err != nil {
return "", err
} else if ok {
return v1.TraitProfileKnative, nil
diff --git a/pkg/trait/gc.go b/pkg/trait/gc.go
index e3e5e79e8..2670de3fd 100644
--- a/pkg/trait/gc.go
+++ b/pkg/trait/gc.go
@@ -47,6 +47,7 @@ import (
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
traitv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait"
"github.com/apache/camel-k/v2/pkg/util"
+ "github.com/apache/camel-k/v2/pkg/util/knative"
"github.com/apache/camel-k/v2/pkg/util/log"
)
@@ -86,20 +87,6 @@ var (
Version: batchv1.SchemeGroupVersion.Version,
}: {},
}
- deletableTypesByProfile =
map[v1.TraitProfile]map[schema.GroupVersionKind]struct{}{
- v1.TraitProfileKnative: {
- schema.GroupVersionKind{
- Kind: "Service",
- Group: "serving.knative.dev",
- Version: "v1",
- }: {},
- schema.GroupVersionKind{
- Kind: "Trigger",
- Group: "eventing.knative.dev",
- Version: "v1",
- }: {},
- },
- }
)
type gcTrait struct {
@@ -160,12 +147,30 @@ func (t *gcTrait) garbageCollectResources(e *Environment)
error {
}
profile := e.DetermineProfile()
- if profileDeletableTypes, ok := deletableTypesByProfile[profile]; ok {
- // copy profile related deletable types if not already present
- for key, value := range profileDeletableTypes {
- if _, found := deletableGVKs[key]; !found {
- deletableGVKs[key] = value
- }
+ deletableTypesByProfile := map[schema.GroupVersionKind]struct{}{}
+
+ if profile == v1.TraitProfileKnative {
+ if ok, _ := knative.IsServingInstalled(e.Client); ok {
+ deletableTypesByProfile[schema.GroupVersionKind{
+ Kind: "Service",
+ Group: "serving.knative.dev",
+ Version: "v1",
+ }] = struct{}{}
+ }
+
+ if ok, _ := knative.IsEventingInstalled(e.Client); ok {
+ deletableTypesByProfile[schema.GroupVersionKind{
+ Kind: "Trigger",
+ Group: "eventing.knative.dev",
+ Version: "v1",
+ }] = struct{}{}
+ }
+ }
+
+ // copy profile related deletable types if not already present
+ for key, value := range deletableTypesByProfile {
+ if _, found := deletableGVKs[key]; !found {
+ deletableGVKs[key] = value
}
}
diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go
index 3328ec650..a6a032a9d 100644
--- a/pkg/trait/knative.go
+++ b/pkg/trait/knative.go
@@ -331,7 +331,9 @@ func (t *knativeTrait) configureEvents(e *Environment, env
*knativeapi.CamelEnvi
serviceName = "default"
}
servicePath := fmt.Sprintf("/events/%s", eventType)
- t.createTrigger(e, ref, eventType, servicePath)
+ if triggerErr := t.createTrigger(e, ref, eventType,
servicePath); triggerErr != nil {
+ return triggerErr
+ }
if !env.ContainsService(serviceName,
knativeapi.CamelEndpointKindSource, knativeapi.CamelServiceTypeEvent,
ref.APIVersion, ref.Kind) {
svc := knativeapi.CamelServiceDefinition{
@@ -475,7 +477,7 @@ func (t *knativeTrait) configureSinkBinding(e *Environment,
env *knativeapi.Came
return err
}
-func (t *knativeTrait) createTrigger(e *Environment, ref
*corev1.ObjectReference, eventType string, path string) {
+func (t *knativeTrait) createTrigger(e *Environment, ref
*corev1.ObjectReference, eventType string, path string) error {
// TODO extend to additional filters too, to filter them at source and
not at destination
found := e.Resources.HasKnativeTrigger(func(trigger *eventing.Trigger)
bool {
return trigger.Spec.Broker == ref.Name &&
@@ -486,9 +488,32 @@ func (t *knativeTrait) createTrigger(e *Environment, ref
*corev1.ObjectReference
if ref.Namespace == "" {
ref.Namespace = e.Integration.Namespace
}
- trigger := knativeutil.CreateTrigger(*ref, e.Integration.Name,
eventType, path)
+
+ controllerStrategy, err := e.DetermineControllerStrategy()
+ if err != nil {
+ return err
+ }
+
+ var trigger *eventing.Trigger
+ switch controllerStrategy {
+ case ControllerStrategyKnativeService:
+ trigger, err =
knativeutil.CreateKnativeServiceTrigger(*ref, e.Integration.Name, eventType,
path)
+ if err != nil {
+ return err
+ }
+ case ControllerStrategyDeployment:
+ trigger, err = knativeutil.CreateServiceTrigger(*ref,
e.Integration.Name, eventType, path)
+ if err != nil {
+ return err
+ }
+ default:
+ return fmt.Errorf("failed to create Knative trigger:
unsupported controller strategy %s", controllerStrategy)
+ }
+
e.Resources.Add(trigger)
}
+
+ return nil
}
func (t *knativeTrait) ifServiceMissingDo(
diff --git a/pkg/trait/knative_service.go b/pkg/trait/knative_service.go
index a8e53a779..0fee5f9da 100644
--- a/pkg/trait/knative_service.go
+++ b/pkg/trait/knative_service.go
@@ -30,6 +30,7 @@ import (
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
traitv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait"
"github.com/apache/camel-k/v2/pkg/metadata"
+ "github.com/apache/camel-k/v2/pkg/util/knative"
"github.com/apache/camel-k/v2/pkg/util/kubernetes"
)
@@ -149,6 +150,11 @@ func (t *knativeServiceTrait) SelectControllerStrategy(e
*Environment) (*Control
return nil, nil
}
+ // Knative serving is required
+ if ok, _ := knative.IsServingInstalled(e.Client); !ok {
+ return nil, nil
+ }
+
var sources []v1.SourceSpec
var err error
if sources, err = kubernetes.ResolveIntegrationSources(e.Ctx, t.Client,
e.Integration, e.Resources); err != nil {
diff --git a/pkg/trait/knative_service_test.go
b/pkg/trait/knative_service_test.go
index a4e79f7dc..658b3101e 100644
--- a/pkg/trait/knative_service_test.go
+++ b/pkg/trait/knative_service_test.go
@@ -414,6 +414,81 @@ func TestKnativeServiceNotApplicable(t *testing.T) {
}))
}
+func TestKnativeServiceNoServingAvailable(t *testing.T) {
+ catalog, err := camel.DefaultCatalog()
+ require.NoError(t, err)
+
+ client, _ := test.NewFakeClient()
+ fakeClient := client.(*test.FakeClient) //nolint
+ fakeClient.DisableKnativeServing()
+
+ traitCatalog := NewCatalog(nil)
+
+ environment := Environment{
+ CamelCatalog: catalog,
+ Catalog: traitCatalog,
+ Client: client,
+ Integration: &v1.Integration{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: KnativeServiceTestName,
+ Namespace: KnativeServiceTestNamespace,
+ },
+ Status: v1.IntegrationStatus{
+ Phase: v1.IntegrationPhaseDeploying,
+ },
+ Spec: v1.IntegrationSpec{
+ Profile: v1.TraitProfileKnative,
+ Sources: []v1.SourceSpec{
+ {
+ DataSpec: v1.DataSpec{
+ Name: "routes.js",
+ Content:
`from("platform-http:test").log("hello")`,
+ },
+ Language: v1.LanguageJavaScript,
+ },
+ },
+ },
+ },
+ IntegrationKit: &v1.IntegrationKit{
+ Status: v1.IntegrationKitStatus{
+ Phase: v1.IntegrationKitPhaseReady,
+ },
+ },
+ Platform: &v1.IntegrationPlatform{
+ Spec: v1.IntegrationPlatformSpec{
+ Cluster: v1.IntegrationPlatformClusterOpenShift,
+ Build: v1.IntegrationPlatformBuildSpec{
+ PublishStrategy:
v1.IntegrationPlatformBuildPublishStrategyS2I,
+ Registry:
v1.RegistrySpec{Address: "registry"},
+ RuntimeVersion:
catalog.Runtime.Version,
+ },
+ },
+ Status: v1.IntegrationPlatformStatus{
+ Phase: v1.IntegrationPlatformPhaseReady,
+ },
+ },
+ EnvVars: make([]corev1.EnvVar, 0),
+ ExecutedTraits: make([]Trait, 0),
+ Resources: kubernetes.NewCollection(),
+ }
+ environment.Platform.ResyncStatusFullConfig()
+
+ // don't care about conditions in this unit test
+ _, err = traitCatalog.apply(&environment)
+
+ require.NoError(t, err)
+ assert.NotEmpty(t, environment.ExecutedTraits)
+ assert.Nil(t, environment.GetTrait("knative-service"))
+
+ assert.Nil(t, environment.Resources.GetKnativeService(func(service
*serving.Service) bool {
+ return service.Name == KnativeServiceTestName
+ }))
+
+ assert.NotNil(t, environment.Resources.GetDeployment(func(deployment
*appsv1.Deployment) bool {
+ return deployment.Name == KnativeServiceTestName
+ }))
+}
+
func TestKnativeServiceWithRollout(t *testing.T) {
environment := createKnativeServiceTestEnvironment(t,
&traitv1.KnativeServiceTrait{RolloutDuration: "60s"})
assert.NotEmpty(t, environment.ExecutedTraits)
diff --git a/pkg/trait/knative_test.go b/pkg/trait/knative_test.go
index 75d075620..454331ecf 100644
--- a/pkg/trait/knative_test.go
+++ b/pkg/trait/knative_test.go
@@ -147,6 +147,14 @@ func TestKnativeEnvConfigurationFromTrait(t *testing.T) {
eEventSink := ne.FindService("default",
knativeapi.CamelEndpointKindSink, knativeapi.CamelServiceTypeEvent,
"eventing.knative.dev/v1", "Broker")
assert.NotNil(t, eEventSink)
assert.Equal(t, "http://broker-default.host/", eEventSink.URL)
+
+ assert.NotNil(t,
environment.Resources.GetKnativeSubscription(func(subscription
*messaging.Subscription) bool {
+ return assert.Equal(t, "channel-source-1-test",
subscription.Name)
+ }))
+
+ assert.NotNil(t, environment.Resources.GetKnativeTrigger(func(trigger
*eventing.Trigger) bool {
+ return assert.Equal(t, "default-test", trigger.Name)
+ }))
}
func TestKnativeEnvConfigurationFromSource(t *testing.T) {
@@ -253,6 +261,203 @@ func TestKnativeEnvConfigurationFromSource(t *testing.T) {
broker := ne.FindService("evt.type",
knativeapi.CamelEndpointKindSource, knativeapi.CamelServiceTypeEvent, "", "")
assert.NotNil(t, broker)
assert.Equal(t, "false",
broker.Metadata[knativeapi.CamelMetaKnativeReply])
+
+ assert.NotNil(t,
environment.Resources.GetKnativeSubscription(func(subscription
*messaging.Subscription) bool {
+ return assert.Equal(t, "channel-source-1-test",
subscription.Name)
+ }))
+
+ assert.NotNil(t, environment.Resources.GetKnativeTrigger(func(trigger
*eventing.Trigger) bool {
+ return assert.Equal(t, "default-test-evttype", trigger.Name)
+ }))
+}
+
+func TestKnativeTriggerConfiguration(t *testing.T) {
+ catalog, err := camel.DefaultCatalog()
+ require.NoError(t, err)
+
+ c, err := NewFakeClient("ns")
+ require.NoError(t, err)
+
+ traitCatalog := NewCatalog(c)
+
+ environment := Environment{
+ CamelCatalog: catalog,
+ Catalog: traitCatalog,
+ Client: c,
+ Integration: &v1.Integration{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test",
+ Namespace: "ns",
+ },
+ Status: v1.IntegrationStatus{
+ Phase: v1.IntegrationPhaseDeploying,
+ },
+ Spec: v1.IntegrationSpec{
+ Profile: v1.TraitProfileKnative,
+ Sources: []v1.SourceSpec{
+ {
+ DataSpec: v1.DataSpec{
+ Name: "route.java",
+ Content: `
+ public class
CartoonMessagesMover extends RouteBuilder {
+ public
void configure() {
+
from("knative:event/evt.type")
+
.log("${body}");
+ }
+ }
+ `,
+ },
+ Language: v1.LanguageJavaSource,
+ },
+ },
+ Traits: v1.Traits{
+ Knative: &traitv1.KnativeTrait{
+ Trait: traitv1.Trait{
+ Enabled:
pointer.Bool(true),
+ },
+ },
+ },
+ },
+ },
+ IntegrationKit: &v1.IntegrationKit{
+ Status: v1.IntegrationKitStatus{
+ Phase: v1.IntegrationKitPhaseReady,
+ },
+ },
+ Platform: &v1.IntegrationPlatform{
+ Spec: v1.IntegrationPlatformSpec{
+ Cluster: v1.IntegrationPlatformClusterOpenShift,
+ Build: v1.IntegrationPlatformBuildSpec{
+ PublishStrategy:
v1.IntegrationPlatformBuildPublishStrategyS2I,
+ Registry:
v1.RegistrySpec{Address: "registry"},
+ RuntimeVersion:
catalog.Runtime.Version,
+ },
+ Profile: v1.TraitProfileKnative,
+ },
+ Status: v1.IntegrationPlatformStatus{
+ Phase: v1.IntegrationPlatformPhaseReady,
+ },
+ },
+ EnvVars: make([]corev1.EnvVar, 0),
+ ExecutedTraits: make([]Trait, 0),
+ Resources: k8sutils.NewCollection(),
+ }
+ environment.Platform.ResyncStatusFullConfig()
+
+ // don't care about conditions in this unit test
+ _, err = traitCatalog.apply(&environment)
+
+ require.NoError(t, err)
+ assert.NotEmpty(t, environment.ExecutedTraits)
+ assert.NotNil(t, environment.GetTrait("knative"))
+
+ assert.NotNil(t, environment.Resources.GetKnativeTrigger(func(trigger
*eventing.Trigger) bool {
+ matching := true
+
+ matching = matching && assert.Equal(t, "default",
trigger.Spec.Broker)
+ matching = matching && assert.Equal(t,
serving.SchemeGroupVersion.String(), trigger.Spec.Subscriber.Ref.APIVersion)
+ matching = matching && assert.Equal(t, "Service",
trigger.Spec.Subscriber.Ref.Kind)
+ matching = matching && assert.Equal(t, "/events/evt.type",
trigger.Spec.Subscriber.URI.Path)
+ matching = matching && assert.Equal(t, "default-test-evttype",
trigger.Name)
+
+ return matching
+ }))
+}
+
+func TestKnativeTriggerConfigurationNoServingAvailable(t *testing.T) {
+ catalog, err := camel.DefaultCatalog()
+ require.NoError(t, err)
+
+ c, err := NewFakeClient("ns")
+ require.NoError(t, err)
+
+ fakeClient := c.(*test.FakeClient) //nolint
+ fakeClient.DisableKnativeServing()
+
+ traitCatalog := NewCatalog(c)
+
+ environment := Environment{
+ CamelCatalog: catalog,
+ Catalog: traitCatalog,
+ Client: c,
+ Integration: &v1.Integration{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test",
+ Namespace: "ns",
+ },
+ Status: v1.IntegrationStatus{
+ Phase: v1.IntegrationPhaseDeploying,
+ },
+ Spec: v1.IntegrationSpec{
+ Profile: v1.TraitProfileKnative,
+ Sources: []v1.SourceSpec{
+ {
+ DataSpec: v1.DataSpec{
+ Name: "route.java",
+ Content: `
+ public class
CartoonMessagesMover extends RouteBuilder {
+ public
void configure() {
+
from("knative:event/evt.type")
+
.log("${body}");
+ }
+ }
+ `,
+ },
+ Language: v1.LanguageJavaSource,
+ },
+ },
+ Traits: v1.Traits{
+ Knative: &traitv1.KnativeTrait{
+ Trait: traitv1.Trait{
+ Enabled:
pointer.Bool(true),
+ },
+ },
+ },
+ },
+ },
+ IntegrationKit: &v1.IntegrationKit{
+ Status: v1.IntegrationKitStatus{
+ Phase: v1.IntegrationKitPhaseReady,
+ },
+ },
+ Platform: &v1.IntegrationPlatform{
+ Spec: v1.IntegrationPlatformSpec{
+ Cluster: v1.IntegrationPlatformClusterOpenShift,
+ Build: v1.IntegrationPlatformBuildSpec{
+ PublishStrategy:
v1.IntegrationPlatformBuildPublishStrategyS2I,
+ Registry:
v1.RegistrySpec{Address: "registry"},
+ RuntimeVersion:
catalog.Runtime.Version,
+ },
+ Profile: v1.TraitProfileKnative,
+ },
+ Status: v1.IntegrationPlatformStatus{
+ Phase: v1.IntegrationPlatformPhaseReady,
+ },
+ },
+ EnvVars: make([]corev1.EnvVar, 0),
+ ExecutedTraits: make([]Trait, 0),
+ Resources: k8sutils.NewCollection(),
+ }
+ environment.Platform.ResyncStatusFullConfig()
+
+ // don't care about conditions in this unit test
+ _, err = traitCatalog.apply(&environment)
+
+ require.NoError(t, err)
+ assert.NotEmpty(t, environment.ExecutedTraits)
+ assert.NotNil(t, environment.GetTrait("knative"))
+
+ assert.NotNil(t, environment.Resources.GetKnativeTrigger(func(trigger
*eventing.Trigger) bool {
+ matching := true
+
+ matching = matching && assert.Equal(t, "default",
trigger.Spec.Broker)
+ matching = matching && assert.Equal(t, "v1",
trigger.Spec.Subscriber.Ref.APIVersion)
+ matching = matching && assert.Equal(t, "Service",
trigger.Spec.Subscriber.Ref.Kind)
+ matching = matching && assert.Equal(t, "/events/evt.type",
trigger.Spec.Subscriber.URI.Path)
+ matching = matching && assert.Equal(t, "default-test-evttype",
trigger.Name)
+
+ return matching
+ }))
}
func TestKnativePlatformHttpConfig(t *testing.T) {
diff --git a/pkg/util/knative/enabled.go b/pkg/util/knative/enabled.go
index 0a6b6ced2..3cb503d5f 100644
--- a/pkg/util/knative/enabled.go
+++ b/pkg/util/knative/enabled.go
@@ -28,7 +28,7 @@ import (
// IsRefKindInstalled returns true if the cluster has the referenced Kind
installed.
func IsRefKindInstalled(c kubernetes.Interface, ref corev1.ObjectReference)
(bool, error) {
- if installed, err := isInstalled(c,
ref.GroupVersionKind().GroupVersion()); err != nil {
+ if installed, err := isServerResourceAvailable(c,
ref.GroupVersionKind().GroupVersion()); err != nil {
return false, err
} else if installed {
return true, nil
@@ -36,6 +36,19 @@ func IsRefKindInstalled(c kubernetes.Interface, ref
corev1.ObjectReference) (boo
return false, nil
}
+// IsInstalled returns true if we are connected to a cluster with either
Knative Serving or Eventing installed.
+func IsInstalled(c kubernetes.Interface) (bool, error) {
+ if ok, err := IsServingInstalled(c); ok {
+ return ok, err
+ } else if ok, err = IsEventingInstalled(c); ok {
+ return ok, err
+ } else if err != nil {
+ return false, err
+ }
+
+ return false, nil
+}
+
// IsServingInstalled returns true if we are connected to a cluster with
Knative Serving installed.
func IsServingInstalled(c kubernetes.Interface) (bool, error) {
return IsRefKindInstalled(c, corev1.ObjectReference{
@@ -52,7 +65,7 @@ func IsEventingInstalled(c kubernetes.Interface) (bool,
error) {
})
}
-func isInstalled(c kubernetes.Interface, api schema.GroupVersion) (bool,
error) {
+func isServerResourceAvailable(c kubernetes.Interface, api
schema.GroupVersion) (bool, error) {
_, err := c.Discovery().ServerResourcesForGroupVersion(api.String())
if err != nil && (k8serrors.IsNotFound(err) ||
util.IsUnknownAPIError(err)) {
return false, nil
diff --git a/pkg/util/knative/knative.go b/pkg/util/knative/knative.go
index 1d5b7462a..b72ae1024 100644
--- a/pkg/util/knative/knative.go
+++ b/pkg/util/knative/knative.go
@@ -75,7 +75,27 @@ func CreateSubscription(channelReference
corev1.ObjectReference, serviceName str
}
}
-func CreateTrigger(brokerReference corev1.ObjectReference, serviceName string,
eventType string, path string) *eventing.Trigger {
+// CreateServiceTrigger create Knative trigger with arbitrary Kubernetes
Service as a subscriber - usually used when no Knative Serving is available on
the cluster.
+func CreateServiceTrigger(brokerReference corev1.ObjectReference, serviceName
string, eventType string, path string) (*eventing.Trigger, error) {
+ subscriberRef := duckv1.KReference{
+ APIVersion: "v1",
+ Kind: "Service",
+ Name: serviceName,
+ }
+ return CreateTrigger(brokerReference, subscriberRef, eventType, path)
+}
+
+// CreateKnativeServiceTrigger create Knative trigger with Knative Serving
Service as a subscriber - default option when Knative Serving is available on
the cluster.
+func CreateKnativeServiceTrigger(brokerReference corev1.ObjectReference,
serviceName string, eventType string, path string) (*eventing.Trigger, error) {
+ subscriberRef := duckv1.KReference{
+ APIVersion: serving.SchemeGroupVersion.String(),
+ Kind: "Service",
+ Name: serviceName,
+ }
+ return CreateTrigger(brokerReference, subscriberRef, eventType, path)
+}
+
+func CreateTrigger(brokerReference corev1.ObjectReference, subscriberRef
duckv1.KReference, eventType string, path string) (*eventing.Trigger, error) {
nameSuffix := ""
var attributes map[string]string
if eventType != "" {
@@ -91,7 +111,7 @@ func CreateTrigger(brokerReference corev1.ObjectReference,
serviceName string, e
},
ObjectMeta: metav1.ObjectMeta{
Namespace: brokerReference.Namespace,
- Name: brokerReference.Name + "-" + serviceName +
nameSuffix,
+ Name: brokerReference.Name + "-" +
subscriberRef.Name + nameSuffix,
},
Spec: eventing.TriggerSpec{
Filter: &eventing.TriggerFilter{
@@ -99,17 +119,13 @@ func CreateTrigger(brokerReference corev1.ObjectReference,
serviceName string, e
},
Broker: brokerReference.Name,
Subscriber: duckv1.Destination{
- Ref: &duckv1.KReference{
- APIVersion:
serving.SchemeGroupVersion.String(),
- Kind: "Service",
- Name: serviceName,
- },
+ Ref: &subscriberRef,
URI: &apis.URL{
Path: path,
},
},
},
- }
+ }, nil
}
func CreateSinkBinding(source corev1.ObjectReference, target
corev1.ObjectReference) *sources.SinkBinding {
diff --git a/pkg/util/kubernetes/collection.go
b/pkg/util/kubernetes/collection.go
index dd3fcaf1a..190d884d2 100644
--- a/pkg/util/kubernetes/collection.go
+++ b/pkg/util/kubernetes/collection.go
@@ -23,6 +23,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
+ messaging "knative.dev/eventing/pkg/apis/messaging/v1"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
@@ -266,7 +267,7 @@ func (c *Collection) GetServiceForIntegration(integration
*v1.Integration) *core
})
}
-// GetKnativeService returns a knative Service that matches the given function.
+// GetKnativeService returns a Knative Service that matches the given function.
func (c *Collection) GetKnativeService(filter func(*serving.Service) bool)
*serving.Service {
var retValue *serving.Service
c.VisitKnativeService(func(re *serving.Service) {
@@ -277,6 +278,28 @@ func (c *Collection) GetKnativeService(filter
func(*serving.Service) bool) *serv
return retValue
}
+// GetKnativeTrigger returns a Knative Trigger that matches the given function.
+func (c *Collection) GetKnativeTrigger(filter func(*eventing.Trigger) bool)
*eventing.Trigger {
+ var retValue *eventing.Trigger
+ c.VisitKnativeTrigger(func(re *eventing.Trigger) {
+ if filter(re) {
+ retValue = re
+ }
+ })
+ return retValue
+}
+
+// GetKnativeSubscription returns a Knative channel Subscription that matches
the given function.
+func (c *Collection) GetKnativeSubscription(filter func(subscription
*messaging.Subscription) bool) *messaging.Subscription {
+ var retValue *messaging.Subscription
+ c.VisitKnativeSubscription(func(re *messaging.Subscription) {
+ if filter(re) {
+ retValue = re
+ }
+ })
+ return retValue
+}
+
// VisitRoute executes the visitor function on all Route resources.
func (c *Collection) VisitRoute(visitor func(*routev1.Route)) {
c.Visit(func(res runtime.Object) {
@@ -357,6 +380,15 @@ func (c *Collection) VisitKnativeTrigger(visitor
func(trigger *eventing.Trigger)
})
}
+// VisitKnativeSubscription executes the visitor function on all Knative
channel Subscription resources.
+func (c *Collection) VisitKnativeSubscription(visitor func(trigger
*messaging.Subscription)) {
+ c.Visit(func(res runtime.Object) {
+ if conv, ok := res.(*messaging.Subscription); ok {
+ visitor(conv)
+ }
+ })
+}
+
// HasKnativeTrigger returns true if a Knative trigger respecting filter is
found.
func (c *Collection) HasKnativeTrigger(filter func(trigger *eventing.Trigger)
bool) bool {
var retValue *bool
diff --git a/pkg/util/test/client.go b/pkg/util/test/client.go
index a821bf81d..b86976eb8 100644
--- a/pkg/util/test/client.go
+++ b/pkg/util/test/client.go
@@ -113,10 +113,12 @@ func NewFakeClient(initObjs ...runtime.Object)
(client.Client, error) {
})
return &FakeClient{
- Client: c,
- Interface: clientset,
- camel: camelClientset,
- scales: &fakescaleclient,
+ Client: c,
+ Interface: clientset,
+ camel: camelClientset,
+ scales: &fakescaleclient,
+ enabledKnativeServing: true,
+ enabledKnativeEventing: true,
}, nil
}
@@ -138,10 +140,12 @@ func filterObjects(scheme *runtime.Scheme, input
[]runtime.Object, filter func(g
type FakeClient struct {
controller.Client
kubernetes.Interface
- camel *fakecamelclientset.Clientset
- scales *fakescale.FakeScaleClient
- disabledGroups []string
- enabledOpenshift bool
+ camel *fakecamelclientset.Clientset
+ scales *fakescale.FakeScaleClient
+ disabledGroups []string
+ enabledOpenshift bool
+ enabledKnativeServing bool
+ enabledKnativeEventing bool
}
func (c *FakeClient) Intercept(intercept *interceptor.Funcs) {
@@ -191,6 +195,14 @@ func (c *FakeClient) EnableOpenshiftDiscovery() {
c.enabledOpenshift = true
}
+func (c *FakeClient) DisableKnativeServing() {
+ c.enabledKnativeServing = false
+}
+
+func (c *FakeClient) DisableKnativeEventing() {
+ c.enabledKnativeEventing = false
+}
+
func (c *FakeClient) AuthorizationV1()
authorizationv1.AuthorizationV1Interface {
return &FakeAuthorization{
AuthorizationV1Interface: c.Interface.AuthorizationV1(),
@@ -201,9 +213,11 @@ func (c *FakeClient) AuthorizationV1()
authorizationv1.AuthorizationV1Interface
func (c *FakeClient) Discovery() discovery.DiscoveryInterface {
return &FakeDiscovery{
- DiscoveryInterface: c.Interface.Discovery(),
- disabledGroups: c.disabledGroups,
- enabledOpenshift: c.enabledOpenshift,
+ DiscoveryInterface: c.Interface.Discovery(),
+ disabledGroups: c.disabledGroups,
+ enabledOpenshift: c.enabledOpenshift,
+ enabledKnativeServing: c.enabledKnativeServing,
+ enabledKnativeEventing: c.enabledKnativeEventing,
}
}
@@ -229,8 +243,10 @@ func (f *FakeAuthorization) SelfSubjectRulesReviews()
authorizationv1.SelfSubjec
type FakeDiscovery struct {
discovery.DiscoveryInterface
- disabledGroups []string
- enabledOpenshift bool
+ disabledGroups []string
+ enabledOpenshift bool
+ enabledKnativeServing bool
+ enabledKnativeEventing bool
}
func (f *FakeDiscovery) ServerResourcesForGroupVersion(groupVersion string)
(*metav1.APIResourceList, error) {
@@ -247,26 +263,33 @@ func (f *FakeDiscovery)
ServerResourcesForGroupVersion(groupVersion string) (*me
}
}
- // used to verify if knative is installed
- if groupVersion == "serving.knative.dev/v1" &&
!util.StringSliceExists(f.disabledGroups, groupVersion) {
- return &metav1.APIResourceList{
- GroupVersion: "serving.knative.dev/v1",
- }, nil
- }
- if groupVersion == "eventing.knative.dev/v1" &&
!util.StringSliceExists(f.disabledGroups, groupVersion) {
- return &metav1.APIResourceList{
- GroupVersion: "eventing.knative.dev/v1",
- }, nil
- }
- if groupVersion == "messaging.knative.dev/v1" &&
!util.StringSliceExists(f.disabledGroups, groupVersion) {
- return &metav1.APIResourceList{
- GroupVersion: "messaging.knative.dev/v1",
- }, nil
+ // used to verify if Knative Serving is installed
+ if f.enabledKnativeServing {
+ if groupVersion == "serving.knative.dev/v1" &&
!util.StringSliceExists(f.disabledGroups, groupVersion) {
+ return &metav1.APIResourceList{
+ GroupVersion: "serving.knative.dev/v1",
+ }, nil
+ }
}
- if groupVersion == "messaging.knative.dev/v1beta1" &&
!util.StringSliceExists(f.disabledGroups, groupVersion) {
- return &metav1.APIResourceList{
- GroupVersion: "messaging.knative.dev/v1beta1",
- }, nil
+
+ // used to verify if Knative Eventing is installed
+ if f.enabledKnativeEventing {
+ if groupVersion == "eventing.knative.dev/v1" &&
!util.StringSliceExists(f.disabledGroups, groupVersion) {
+ return &metav1.APIResourceList{
+ GroupVersion: "eventing.knative.dev/v1",
+ }, nil
+ }
+ if groupVersion == "messaging.knative.dev/v1" &&
!util.StringSliceExists(f.disabledGroups, groupVersion) {
+ return &metav1.APIResourceList{
+ GroupVersion: "messaging.knative.dev/v1",
+ }, nil
+ }
+ if groupVersion == "messaging.knative.dev/v1beta1" &&
!util.StringSliceExists(f.disabledGroups, groupVersion) {
+ return &metav1.APIResourceList{
+ GroupVersion: "messaging.knative.dev/v1beta1",
+ }, nil
+ }
}
+
return f.DiscoveryInterface.ServerResourcesForGroupVersion(groupVersion)
}