This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit f7ad34d9d6b8ad5f41f0be56f43171db8c9a8537
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Tue Mar 16 16:17:56 2021 +0100

    feat(kamelets): error handling kamelets binding support
    
    * Added the capability to include an `errorHandler` support that will be 
converted as an `error-handler` in the generated integration.
    * Fixed the inspector_yaml.go to parse the integration `error-handler` and 
load declared Kamelet source
---
 pkg/apis/camel/v1alpha1/kamelet_binding_types.go |  9 ++++--
 pkg/controller/kameletbinding/common.go          | 36 +++++++++++++++++++++---
 pkg/trait/kamelets.go                            |  1 +
 pkg/util/source/inspector_yaml.go                |  4 +++
 pkg/util/source/types.go                         |  2 ++
 5 files changed, 45 insertions(+), 7 deletions(-)

diff --git a/pkg/apis/camel/v1alpha1/kamelet_binding_types.go 
b/pkg/apis/camel/v1alpha1/kamelet_binding_types.go
index 8fea2ac..b91ed55 100644
--- a/pkg/apis/camel/v1alpha1/kamelet_binding_types.go
+++ b/pkg/apis/camel/v1alpha1/kamelet_binding_types.go
@@ -32,6 +32,8 @@ type KameletBindingSpec struct {
        Source Endpoint `json:"source,omitempty"`
        // Sink is the destination of the integration defined by this binding
        Sink Endpoint `json:"sink,omitempty"`
+       // ErrorHandler is an optional handler called upon an error occuring in 
the integration
+       ErrorHandler Endpoint `json:"errorHandler,omitempty"`
        // Steps contains an optional list of intermediate steps that are 
executed between the Source and the Sink
        Steps []Endpoint `json:"steps,omitempty"`
 }
@@ -51,9 +53,10 @@ type Endpoint struct {
 type EndpointType string
 
 const (
-       EndpointTypeSource EndpointType = "source"
-       EndpointTypeAction EndpointType = "action"
-       EndpointTypeSink   EndpointType = "sink"
+       EndpointTypeSource       EndpointType = "source"
+       EndpointTypeAction       EndpointType = "action"
+       EndpointTypeSink         EndpointType = "sink"
+       EndpointTypeErrorHandler EndpointType = "errorHandler"
 )
 
 // EndpointProperties is a key/value struct represented as JSON raw to allow 
numeric/boolean values
diff --git a/pkg/controller/kameletbinding/common.go 
b/pkg/controller/kameletbinding/common.go
index 411bde8..354dfa2 100644
--- a/pkg/controller/kameletbinding/common.go
+++ b/pkg/controller/kameletbinding/common.go
@@ -79,6 +79,10 @@ func createIntegrationFor(ctx context.Context, c 
client.Client, kameletbinding *
        if err != nil {
                return nil, errors.Wrap(err, "could not determine sink URI")
        }
+       onError, err := bindings.Translate(bindingContext, 
bindings.EndpointContext{Type: v1alpha1.EndpointTypeErrorHandler}, 
kameletbinding.Spec.ErrorHandler)
+       if err != nil {
+               return nil, errors.Wrap(err, "could not determine error handler 
URI")
+       }
 
        steps := make([]*bindings.Binding, 0, len(kameletbinding.Spec.Steps))
        for idx, step := range kameletbinding.Spec.Steps {
@@ -93,10 +97,13 @@ func createIntegrationFor(ctx context.Context, c 
client.Client, kameletbinding *
                steps = append(steps, stepBinding)
        }
 
-       allBindings := make([]*bindings.Binding, 0, len(steps)+2)
+       allBindings := make([]*bindings.Binding, 0, len(steps)+3)
        allBindings = append(allBindings, from)
        allBindings = append(allBindings, steps...)
        allBindings = append(allBindings, to)
+       if onError != nil {
+               allBindings = append(allBindings, onError)
+       }
 
        propList := make([]string, 0)
        for _, b := range allBindings {
@@ -129,17 +136,38 @@ func createIntegrationFor(ctx context.Context, c 
client.Client, kameletbinding *
                "to": to.URI,
        })
 
-       flow := map[string]interface{}{
+       // Append Error Handler flow, if it exists
+       if onError != nil {
+               flowErrorHandler := map[string]interface{}{
+                       "error-handler": map[string]interface{}{
+                               "dead-letter-channel": map[string]interface{}{
+                                       "dead-letter-uri":                  
onError.URI,
+                                       "dead-letter-handle-new-exception": 
true,
+                                       "async-delayed-redelivery":         
false,
+                                       "use-original-message":             
true,
+                               },
+                       },
+               }
+               encodedErrorHandler, err := json.Marshal(flowErrorHandler)
+
+               if err != nil {
+                       return nil, err
+               }
+               it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: 
encodedErrorHandler})
+       }
+
+       // Append From flow: it must exist
+       flowFrom := map[string]interface{}{
                "from": map[string]interface{}{
                        "uri":   from.URI,
                        "steps": dslSteps,
                },
        }
-       encodedFlow, err := json.Marshal(flow)
+       encodedFrom, err := json.Marshal(flowFrom)
        if err != nil {
                return nil, err
        }
-       it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedFlow})
+       it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedFrom})
 
        return &it, nil
 }
diff --git a/pkg/trait/kamelets.go b/pkg/trait/kamelets.go
index df437d9..9d55b19 100644
--- a/pkg/trait/kamelets.go
+++ b/pkg/trait/kamelets.go
@@ -103,6 +103,7 @@ func (t *kameletsTrait) Configure(e *Environment) (bool, 
error) {
                        metadata.Each(e.CamelCatalog, sources, func(_ int, meta 
metadata.IntegrationMetadata) bool {
                                util.StringSliceUniqueConcat(&kamelets, 
extractKamelets(meta.FromURIs))
                                util.StringSliceUniqueConcat(&kamelets, 
extractKamelets(meta.ToURIs))
+                               util.StringSliceUniqueConcat(&kamelets, 
extractKamelets(meta.ErrorHandlerURIs))
                                return true
                        })
                        sort.Strings(kamelets)
diff --git a/pkg/util/source/inspector_yaml.go 
b/pkg/util/source/inspector_yaml.go
index 1f96ae8..4c47dc2 100644
--- a/pkg/util/source/inspector_yaml.go
+++ b/pkg/util/source/inspector_yaml.go
@@ -78,6 +78,10 @@ func (i YAMLInspector) parseStep(key string, content 
interface{}, meta *Metadata
                                }
                        }
                }
+       case "error-handler":
+               deadLetterChannel := content.(map[interface{}]interface{})
+               deadLetterURI := 
deadLetterChannel["dead-letter-channel"].(map[interface{}]interface{})
+               meta.ErrorHandlerURIs = append(meta.ErrorHandlerURIs, 
deadLetterURI["dead-letter-uri"].(string))
        }
 
        var maybeURI string
diff --git a/pkg/util/source/types.go b/pkg/util/source/types.go
index f6f15e0..46074f8 100644
--- a/pkg/util/source/types.go
+++ b/pkg/util/source/types.go
@@ -25,6 +25,8 @@ type Metadata struct {
        FromURIs []string
        // All end URIs of defined routes
        ToURIs []string
+       // All error handlers URIs of defined routes
+       ErrorHandlerURIs []string
        // All inferred dependencies required to run the integration
        Dependencies *strset.Set
        // ExposesHTTPServices indicates if a route defined by the source is 
exposed

Reply via email to