This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 95a453a86444f46115990841c0ceb82e9d380c43 Author: Luca Burgazzoli <[email protected]> AuthorDate: Tue Jun 8 12:18:54 2021 +0200 kamelet binding: use the Kamelet EIP for steps #2370 --- pkg/controller/kameletbinding/common.go | 42 ++++++++++++--- pkg/resources/resources.go | 8 --- pkg/util/bindings/api.go | 2 + pkg/util/bindings/kamelet.go | 23 +++++---- pkg/util/bindings/kamelet_test.go | 92 +++++++++++++++++++++++++++++++++ 5 files changed, 142 insertions(+), 25 deletions(-) diff --git a/pkg/controller/kameletbinding/common.go b/pkg/controller/kameletbinding/common.go index 7591f80..18725fe 100644 --- a/pkg/controller/kameletbinding/common.go +++ b/pkg/controller/kameletbinding/common.go @@ -36,6 +36,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +var endpointTypeSourceContext = bindings.EndpointContext{Type: v1alpha1.EndpointTypeSource} +var endpointTypeSinkContext = bindings.EndpointContext{Type: v1alpha1.EndpointTypeSink} + func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding *v1alpha1.KameletBinding) (*v1.Integration, error) { controller := true blockOwnerDeletion := true @@ -83,11 +86,11 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding * Profile: profile, } - from, err := bindings.Translate(bindingContext, bindings.EndpointContext{Type: v1alpha1.EndpointTypeSource}, kameletbinding.Spec.Source) + from, err := bindings.Translate(bindingContext, endpointTypeSourceContext, kameletbinding.Spec.Source) if err != nil { return nil, errors.Wrap(err, "could not determine source URI") } - to, err := bindings.Translate(bindingContext, bindings.EndpointContext{Type: v1alpha1.EndpointTypeSink}, kameletbinding.Spec.Sink) + to, err := bindings.Translate(bindingContext, endpointTypeSinkContext, kameletbinding.Spec.Sink) if err != nil { return nil, errors.Wrap(err, "could not determine sink URI") } @@ -110,6 +113,18 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding * steps = append(steps, stepBinding) } + if to.Step == nil && to.URI == "" { + return nil, errors.Errorf("illegal step definition for sink step: either Step or URI should be provided") + } + if from.URI == "" { + return nil, errors.Errorf("illegal step definition for source step: URI should be provided") + } + for index, step := range steps { + if step.Step == nil && step.URI == "" { + return nil, errors.Errorf("illegal step definition for step %d: either Step or URI should be provided", index) + } + } + allBindings := make([]*bindings.Binding, 0, len(steps)+3) allBindings = append(allBindings, from) allBindings = append(allBindings, steps...) @@ -145,13 +160,24 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding * dslSteps := make([]map[string]interface{}, 0) for _, step := range steps { - dslSteps = append(dslSteps, map[string]interface{}{ - "to": step.URI, - }) + s := step.Step + if s == nil { + s = map[string]interface{}{ + "to": step.URI, + } + } + + dslSteps = append(dslSteps, s) } - dslSteps = append(dslSteps, map[string]interface{}{ - "to": to.URI, - }) + + s := to.Step + if s == nil { + s = map[string]interface{}{ + "to": to.URI, + } + } + + dslSteps = append(dslSteps, s) flowFrom := map[string]interface{}{ "from": map[string]interface{}{ diff --git a/pkg/resources/resources.go b/pkg/resources/resources.go index b018400..d7e9c58 100644 --- a/pkg/resources/resources.go +++ b/pkg/resources/resources.go @@ -464,13 +464,6 @@ var assets = func() http.FileSystem { compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xc4\x7d\x4b\x77\xdb\x3a\x96\xee\x3c\xbf\x82\xeb\x64\xd2\xbd\x6e\x09\x75\x4e\x52\xb7\x4f\xdf\xdc\x91\x2d\xc7\x89\x1d\xcb\x71\x22\x57\x92\xaa\xc9\x59\x10\x09\x51\xb0\x48\x82\x06\x40\x59\xce\xaf\xef\x85\x07\x9f\x52\x36\x1f\x06\xd4\x1a\x88\x20\xb1\xf1\x6d\xec\x0f\x0f\x12\xef\xd7\xc1\xcc\xdd\xef\xd5\xeb\xe0\x86\x86\x24\x13\x24\x0a\x24\x0b\xe4\x86\x04\x67\x39\x0e\x37\x24\x58\xb2\xb5\x7c\xc2\x9c\x04\x97\xac\xc8\x22\x2c\x29\x [...] }, - "/camel-catalog-1.9.0-SNAPSHOT.yaml": &vfsgen۰CompressedFileInfo{ - name: "camel-catalog-1.9.0-SNAPSHOT.yaml", - modTime: time.Time{}, - uncompressedSize: 89713, - - compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xc4\x7d\x5b\x77\xdb\x38\xb6\xe6\x7b\x7e\x05\x57\xe5\xe5\x9c\x35\x2d\x74\x55\xaa\xe7\xd4\x4c\xcd\x93\x23\xc7\x89\x1d\xcb\x71\x22\x77\x92\xee\x97\x5a\x10\x09\x49\xb0\x48\x82\x06\x40\x59\xce\xaf\x9f\x05\x10\xbc\x4a\xd9\xbc\x78\xc3\xad\x07\xf1\x82\x8d\x6f\x63\x7f\xb8\x10\x77\xbc\x0e\x66\x78\xbf\x57\xaf\x83\x6b\x1e\xb2\x54\xb1\x28\xd0\x22\xd0\x5b\x16\x9c\x65\x34\xdc\xb2\x60\x29\xd6\xfa\x91\x4a\x16\x5c\x88\x3c\x8d\xa8\xe6\x [...] - }, "/traits.yaml": &vfsgen۰CompressedFileInfo{ name: "traits.yaml", modTime: time.Time{}, @@ -482,7 +475,6 @@ var assets = func() http.FileSystem { fs["/"].(*vfsgen۰DirInfo).entries = []os.FileInfo{ fs["/addons"].(os.FileInfo), fs["/camel-catalog-1.7.0.yaml"].(os.FileInfo), - fs["/camel-catalog-1.9.0-SNAPSHOT.yaml"].(os.FileInfo), fs["/crd"].(os.FileInfo), fs["/default"].(os.FileInfo), fs["/manager"].(os.FileInfo), diff --git a/pkg/util/bindings/api.go b/pkg/util/bindings/api.go index 14f09b8..c844d1e 100644 --- a/pkg/util/bindings/api.go +++ b/pkg/util/bindings/api.go @@ -37,6 +37,8 @@ const ( type Binding struct { // URI is the Camel URI equivalent URI string + // Step is to support complex mapping such as Camel's EIPs + Step map[string]interface{} // Traits is a partial trait specification that should be merged into the integration Traits map[string]v1.TraitSpec // ApplicationProperties contain properties that should be set on the integration for the binding to work diff --git a/pkg/util/bindings/kamelet.go b/pkg/util/bindings/kamelet.go index 9d3f596..d52f144 100644 --- a/pkg/util/bindings/kamelet.go +++ b/pkg/util/bindings/kamelet.go @@ -44,7 +44,6 @@ func (k KameletBindingProvider) Translate(ctx BindingContext, endpointCtx Endpoi // it translates only Kamelet refs if e.Ref.Kind == v1alpha1.KameletKind && gv.Group == v1alpha1.SchemeGroupVersion.Group { kameletName := url.PathEscape(e.Ref.Name) - kameletURI := fmt.Sprintf("kamelet:%s", kameletName) props, err := e.Properties.GetPropertyMap() if err != nil { @@ -57,21 +56,27 @@ func (k KameletBindingProvider) Translate(ctx BindingContext, endpointCtx Endpoi } else { id = endpointCtx.GenerateID() } - kameletURI = fmt.Sprintf("%s/%s", kameletURI, url.PathEscape(id)) - var applicationProperties map[string]string + binding := Binding{} + if endpointCtx.Type == v1alpha1.EndpointTypeAction { + binding.Step = map[string]interface{}{ + "kamelet": map[string]interface{}{ + "name": fmt.Sprintf("%s/%s", kameletName, url.PathEscape(id)), + }, + } + } else { + binding.URI = fmt.Sprintf("kamelet:%s/%s", kameletName, url.PathEscape(id)) + } + if len(props) > 0 { - applicationProperties = make(map[string]string, len(props)) + binding.ApplicationProperties = make(map[string]string, len(props)) for k, v := range props { propKey := fmt.Sprintf("camel.kamelet.%s.%s.%s", kameletName, id, k) - applicationProperties[propKey] = v + binding.ApplicationProperties[propKey] = v } } - return &Binding{ - URI: kameletURI, - ApplicationProperties: applicationProperties, - }, nil + return &binding, nil } return nil, nil } diff --git a/pkg/util/bindings/kamelet_test.go b/pkg/util/bindings/kamelet_test.go new file mode 100644 index 0000000..53d7887 --- /dev/null +++ b/pkg/util/bindings/kamelet_test.go @@ -0,0 +1,92 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bindings + +import ( + "context" + "fmt" + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" + corev1 "k8s.io/api/core/v1" + + camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1" + "github.com/apache/camel-k/pkg/util/test" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestKameletBindingProvider(t *testing.T) { + testcases := []struct { + endpointType v1alpha1.EndpointType + uri string + step map[string]interface{} + }{ + { + endpointType: v1alpha1.EndpointTypeSource, + uri: "kamelet:mykamelet/source", + step: nil, + }, + { + endpointType: v1alpha1.EndpointTypeAction, + uri: "", + step: map[string]interface{}{ + "kamelet": map[string]interface{}{ + "name": "mykamelet/action", + }, + }, + }, + { + endpointType: v1alpha1.EndpointTypeSink, + uri: "kamelet:mykamelet/sink", + step: nil, + }, + } + + for i, tc := range testcases { + t.Run(fmt.Sprintf("test-%d-%s", i, tc.uri), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client, err := test.NewFakeClient() + assert.NoError(t, err) + + endpoint := v1alpha1.Endpoint{ + Ref: &corev1.ObjectReference{ + Kind: "Kamelet", + APIVersion: "camel.apache.org/v1any1", + Name: "mykamelet", + }, + } + + binding, err := KameletBindingProvider{}.Translate( + BindingContext{ + Ctx: ctx, + Client: client, + Namespace: "test", + Profile: camelv1.TraitProfileKubernetes, + }, + EndpointContext{Type: tc.endpointType}, + endpoint) + + assert.NoError(t, err) + assert.NotNil(t, binding) + assert.Equal(t, tc.step, binding.Step) + assert.Equal(t, tc.uri, binding.URI) + }) + } + +}
