This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 95a453a86444f46115990841c0ceb82e9d380c43
Author: Luca Burgazzoli <[email protected]>
AuthorDate: Tue Jun 8 12:18:54 2021 +0200

    kamelet binding: use the Kamelet EIP for steps #2370
---
 pkg/controller/kameletbinding/common.go | 42 ++++++++++++---
 pkg/resources/resources.go              |  8 ---
 pkg/util/bindings/api.go                |  2 +
 pkg/util/bindings/kamelet.go            | 23 +++++----
 pkg/util/bindings/kamelet_test.go       | 92 +++++++++++++++++++++++++++++++++
 5 files changed, 142 insertions(+), 25 deletions(-)

diff --git a/pkg/controller/kameletbinding/common.go 
b/pkg/controller/kameletbinding/common.go
index 7591f80..18725fe 100644
--- a/pkg/controller/kameletbinding/common.go
+++ b/pkg/controller/kameletbinding/common.go
@@ -36,6 +36,9 @@ import (
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 )
 
+var endpointTypeSourceContext = bindings.EndpointContext{Type: 
v1alpha1.EndpointTypeSource}
+var endpointTypeSinkContext = bindings.EndpointContext{Type: 
v1alpha1.EndpointTypeSink}
+
 func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding 
*v1alpha1.KameletBinding) (*v1.Integration, error) {
        controller := true
        blockOwnerDeletion := true
@@ -83,11 +86,11 @@ func createIntegrationFor(ctx context.Context, c 
client.Client, kameletbinding *
                Profile:   profile,
        }
 
-       from, err := bindings.Translate(bindingContext, 
bindings.EndpointContext{Type: v1alpha1.EndpointTypeSource}, 
kameletbinding.Spec.Source)
+       from, err := bindings.Translate(bindingContext, 
endpointTypeSourceContext, kameletbinding.Spec.Source)
        if err != nil {
                return nil, errors.Wrap(err, "could not determine source URI")
        }
-       to, err := bindings.Translate(bindingContext, 
bindings.EndpointContext{Type: v1alpha1.EndpointTypeSink}, 
kameletbinding.Spec.Sink)
+       to, err := bindings.Translate(bindingContext, endpointTypeSinkContext, 
kameletbinding.Spec.Sink)
        if err != nil {
                return nil, errors.Wrap(err, "could not determine sink URI")
        }
@@ -110,6 +113,18 @@ func createIntegrationFor(ctx context.Context, c 
client.Client, kameletbinding *
                steps = append(steps, stepBinding)
        }
 
+       if to.Step == nil && to.URI == "" {
+               return nil, errors.Errorf("illegal step definition for sink 
step: either Step or URI should be provided")
+       }
+       if from.URI == "" {
+               return nil, errors.Errorf("illegal step definition for source 
step: URI should be provided")
+       }
+       for index, step := range steps {
+               if step.Step == nil && step.URI == "" {
+                       return nil, errors.Errorf("illegal step definition for 
step %d: either Step or URI should be provided", index)
+               }
+       }
+
        allBindings := make([]*bindings.Binding, 0, len(steps)+3)
        allBindings = append(allBindings, from)
        allBindings = append(allBindings, steps...)
@@ -145,13 +160,24 @@ func createIntegrationFor(ctx context.Context, c 
client.Client, kameletbinding *
 
        dslSteps := make([]map[string]interface{}, 0)
        for _, step := range steps {
-               dslSteps = append(dslSteps, map[string]interface{}{
-                       "to": step.URI,
-               })
+               s := step.Step
+               if s == nil {
+                       s = map[string]interface{}{
+                               "to": step.URI,
+                       }
+               }
+
+               dslSteps = append(dslSteps, s)
        }
-       dslSteps = append(dslSteps, map[string]interface{}{
-               "to": to.URI,
-       })
+
+       s := to.Step
+       if s == nil {
+               s = map[string]interface{}{
+                       "to": to.URI,
+               }
+       }
+
+       dslSteps = append(dslSteps, s)
 
        flowFrom := map[string]interface{}{
                "from": map[string]interface{}{
diff --git a/pkg/resources/resources.go b/pkg/resources/resources.go
index b018400..d7e9c58 100644
--- a/pkg/resources/resources.go
+++ b/pkg/resources/resources.go
@@ -464,13 +464,6 @@ var assets = func() http.FileSystem {
 
                        compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xc4\x7d\x4b\x77\xdb\x3a\x96\xee\x3c\xbf\x82\xeb\x64\xd2\xbd\x6e\x09\x75\x4e\x52\xb7\x4f\xdf\xdc\x91\x2d\xc7\x89\x1d\xcb\x71\x22\x57\x92\xaa\xc9\x59\x10\x09\x51\xb0\x48\x82\x06\x40\x59\xce\xaf\xef\x85\x07\x9f\x52\x36\x1f\x06\xd4\x1a\x88\x20\xb1\xf1\x6d\xec\x0f\x0f\x12\xef\xd7\xc1\xcc\xdd\xef\xd5\xeb\xe0\x86\x86\x24\x13\x24\x0a\x24\x0b\xe4\x86\x04\x67\x39\x0e\x37\x24\x58\xb2\xb5\x7c\xc2\x9c\x04\x97\xac\xc8\x22\x2c\x29\x
 [...]
                },
-               "/camel-catalog-1.9.0-SNAPSHOT.yaml": 
&vfsgen۰CompressedFileInfo{
-                       name:             "camel-catalog-1.9.0-SNAPSHOT.yaml",
-                       modTime:          time.Time{},
-                       uncompressedSize: 89713,
-
-                       compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xc4\x7d\x5b\x77\xdb\x38\xb6\xe6\x7b\x7e\x05\x57\xe5\xe5\x9c\x35\x2d\x74\x55\xaa\xe7\xd4\x4c\xcd\x93\x23\xc7\x89\x1d\xcb\x71\x22\x77\x92\xee\x97\x5a\x10\x09\x49\xb0\x48\x82\x06\x40\x59\xce\xaf\x9f\x05\x10\xbc\x4a\xd9\xbc\x78\xc3\xad\x07\xf1\x82\x8d\x6f\x63\x7f\xb8\x10\x77\xbc\x0e\x66\x78\xbf\x57\xaf\x83\x6b\x1e\xb2\x54\xb1\x28\xd0\x22\xd0\x5b\x16\x9c\x65\x34\xdc\xb2\x60\x29\xd6\xfa\x91\x4a\x16\x5c\x88\x3c\x8d\xa8\xe6\x
 [...]
-               },
                "/traits.yaml": &vfsgen۰CompressedFileInfo{
                        name:             "traits.yaml",
                        modTime:          time.Time{},
@@ -482,7 +475,6 @@ var assets = func() http.FileSystem {
        fs["/"].(*vfsgen۰DirInfo).entries = []os.FileInfo{
                fs["/addons"].(os.FileInfo),
                fs["/camel-catalog-1.7.0.yaml"].(os.FileInfo),
-               fs["/camel-catalog-1.9.0-SNAPSHOT.yaml"].(os.FileInfo),
                fs["/crd"].(os.FileInfo),
                fs["/default"].(os.FileInfo),
                fs["/manager"].(os.FileInfo),
diff --git a/pkg/util/bindings/api.go b/pkg/util/bindings/api.go
index 14f09b8..c844d1e 100644
--- a/pkg/util/bindings/api.go
+++ b/pkg/util/bindings/api.go
@@ -37,6 +37,8 @@ const (
 type Binding struct {
        // URI is the Camel URI equivalent
        URI string
+       // Step is to support complex mapping such as Camel's EIPs
+       Step map[string]interface{}
        // Traits is a partial trait specification that should be merged into 
the integration
        Traits map[string]v1.TraitSpec
        // ApplicationProperties contain properties that should be set on the 
integration for the binding to work
diff --git a/pkg/util/bindings/kamelet.go b/pkg/util/bindings/kamelet.go
index 9d3f596..d52f144 100644
--- a/pkg/util/bindings/kamelet.go
+++ b/pkg/util/bindings/kamelet.go
@@ -44,7 +44,6 @@ func (k KameletBindingProvider) Translate(ctx BindingContext, 
endpointCtx Endpoi
        // it translates only Kamelet refs
        if e.Ref.Kind == v1alpha1.KameletKind && gv.Group == 
v1alpha1.SchemeGroupVersion.Group {
                kameletName := url.PathEscape(e.Ref.Name)
-               kameletURI := fmt.Sprintf("kamelet:%s", kameletName)
 
                props, err := e.Properties.GetPropertyMap()
                if err != nil {
@@ -57,21 +56,27 @@ func (k KameletBindingProvider) Translate(ctx 
BindingContext, endpointCtx Endpoi
                } else {
                        id = endpointCtx.GenerateID()
                }
-               kameletURI = fmt.Sprintf("%s/%s", kameletURI, 
url.PathEscape(id))
 
-               var applicationProperties map[string]string
+               binding := Binding{}
+               if endpointCtx.Type == v1alpha1.EndpointTypeAction {
+                       binding.Step = map[string]interface{}{
+                               "kamelet": map[string]interface{}{
+                                       "name": fmt.Sprintf("%s/%s", 
kameletName, url.PathEscape(id)),
+                               },
+                       }
+               } else {
+                       binding.URI = fmt.Sprintf("kamelet:%s/%s", kameletName, 
url.PathEscape(id))
+               }
+
                if len(props) > 0 {
-                       applicationProperties = make(map[string]string, 
len(props))
+                       binding.ApplicationProperties = make(map[string]string, 
len(props))
                        for k, v := range props {
                                propKey := 
fmt.Sprintf("camel.kamelet.%s.%s.%s", kameletName, id, k)
-                               applicationProperties[propKey] = v
+                               binding.ApplicationProperties[propKey] = v
                        }
                }
 
-               return &Binding{
-                       URI:                   kameletURI,
-                       ApplicationProperties: applicationProperties,
-               }, nil
+               return &binding, nil
        }
        return nil, nil
 }
diff --git a/pkg/util/bindings/kamelet_test.go 
b/pkg/util/bindings/kamelet_test.go
new file mode 100644
index 0000000..53d7887
--- /dev/null
+++ b/pkg/util/bindings/kamelet_test.go
@@ -0,0 +1,92 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package bindings
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+       corev1 "k8s.io/api/core/v1"
+
+       camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+       "github.com/apache/camel-k/pkg/util/test"
+       "github.com/stretchr/testify/assert"
+       "testing"
+)
+
+func TestKameletBindingProvider(t *testing.T) {
+       testcases := []struct {
+               endpointType v1alpha1.EndpointType
+               uri          string
+               step         map[string]interface{}
+       }{
+               {
+                       endpointType: v1alpha1.EndpointTypeSource,
+                       uri:          "kamelet:mykamelet/source",
+                       step:         nil,
+               },
+               {
+                       endpointType: v1alpha1.EndpointTypeAction,
+                       uri:          "",
+                       step: map[string]interface{}{
+                               "kamelet": map[string]interface{}{
+                                       "name": "mykamelet/action",
+                               },
+                       },
+               },
+               {
+                       endpointType: v1alpha1.EndpointTypeSink,
+                       uri:          "kamelet:mykamelet/sink",
+                       step:         nil,
+               },
+       }
+
+       for i, tc := range testcases {
+               t.Run(fmt.Sprintf("test-%d-%s", i, tc.uri), func(t *testing.T) {
+                       ctx, cancel := context.WithCancel(context.Background())
+                       defer cancel()
+
+                       client, err := test.NewFakeClient()
+                       assert.NoError(t, err)
+
+                       endpoint := v1alpha1.Endpoint{
+                               Ref: &corev1.ObjectReference{
+                                       Kind:       "Kamelet",
+                                       APIVersion: "camel.apache.org/v1any1",
+                                       Name:       "mykamelet",
+                               },
+                       }
+
+                       binding, err := KameletBindingProvider{}.Translate(
+                               BindingContext{
+                                       Ctx:       ctx,
+                                       Client:    client,
+                                       Namespace: "test",
+                                       Profile:   
camelv1.TraitProfileKubernetes,
+                               },
+                               EndpointContext{Type: tc.endpointType},
+                               endpoint)
+
+                       assert.NoError(t, err)
+                       assert.NotNil(t, binding)
+                       assert.Equal(t, tc.step, binding.Step)
+                       assert.Equal(t, tc.uri, binding.URI)
+               })
+       }
+
+}

Reply via email to