This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit f7ad34d9d6b8ad5f41f0be56f43171db8c9a8537 Author: Pasquale Congiusti <[email protected]> AuthorDate: Tue Mar 16 16:17:56 2021 +0100 feat(kamelets): error handling kamelets binding support * Added the capability to include an `errorHandler` support that will be converted as an `error-handler` in the generated integration. * Fixed the inspector_yaml.go to parse the integration `error-handler` and load declared Kamelet source --- pkg/apis/camel/v1alpha1/kamelet_binding_types.go | 9 ++++-- pkg/controller/kameletbinding/common.go | 36 +++++++++++++++++++++--- pkg/trait/kamelets.go | 1 + pkg/util/source/inspector_yaml.go | 4 +++ pkg/util/source/types.go | 2 ++ 5 files changed, 45 insertions(+), 7 deletions(-) diff --git a/pkg/apis/camel/v1alpha1/kamelet_binding_types.go b/pkg/apis/camel/v1alpha1/kamelet_binding_types.go index 8fea2ac..b91ed55 100644 --- a/pkg/apis/camel/v1alpha1/kamelet_binding_types.go +++ b/pkg/apis/camel/v1alpha1/kamelet_binding_types.go @@ -32,6 +32,8 @@ type KameletBindingSpec struct { Source Endpoint `json:"source,omitempty"` // Sink is the destination of the integration defined by this binding Sink Endpoint `json:"sink,omitempty"` + // ErrorHandler is an optional handler called upon an error occuring in the integration + ErrorHandler Endpoint `json:"errorHandler,omitempty"` // Steps contains an optional list of intermediate steps that are executed between the Source and the Sink Steps []Endpoint `json:"steps,omitempty"` } @@ -51,9 +53,10 @@ type Endpoint struct { type EndpointType string const ( - EndpointTypeSource EndpointType = "source" - EndpointTypeAction EndpointType = "action" - EndpointTypeSink EndpointType = "sink" + EndpointTypeSource EndpointType = "source" + EndpointTypeAction EndpointType = "action" + EndpointTypeSink EndpointType = "sink" + EndpointTypeErrorHandler EndpointType = "errorHandler" ) // EndpointProperties is a key/value struct represented as JSON raw to allow numeric/boolean values diff --git a/pkg/controller/kameletbinding/common.go b/pkg/controller/kameletbinding/common.go index 411bde8..354dfa2 100644 --- a/pkg/controller/kameletbinding/common.go +++ b/pkg/controller/kameletbinding/common.go @@ -79,6 +79,10 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding * if err != nil { return nil, errors.Wrap(err, "could not determine sink URI") } + onError, err := bindings.Translate(bindingContext, bindings.EndpointContext{Type: v1alpha1.EndpointTypeErrorHandler}, kameletbinding.Spec.ErrorHandler) + if err != nil { + return nil, errors.Wrap(err, "could not determine error handler URI") + } steps := make([]*bindings.Binding, 0, len(kameletbinding.Spec.Steps)) for idx, step := range kameletbinding.Spec.Steps { @@ -93,10 +97,13 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding * steps = append(steps, stepBinding) } - allBindings := make([]*bindings.Binding, 0, len(steps)+2) + allBindings := make([]*bindings.Binding, 0, len(steps)+3) allBindings = append(allBindings, from) allBindings = append(allBindings, steps...) allBindings = append(allBindings, to) + if onError != nil { + allBindings = append(allBindings, onError) + } propList := make([]string, 0) for _, b := range allBindings { @@ -129,17 +136,38 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding * "to": to.URI, }) - flow := map[string]interface{}{ + // Append Error Handler flow, if it exists + if onError != nil { + flowErrorHandler := map[string]interface{}{ + "error-handler": map[string]interface{}{ + "dead-letter-channel": map[string]interface{}{ + "dead-letter-uri": onError.URI, + "dead-letter-handle-new-exception": true, + "async-delayed-redelivery": false, + "use-original-message": true, + }, + }, + } + encodedErrorHandler, err := json.Marshal(flowErrorHandler) + + if err != nil { + return nil, err + } + it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedErrorHandler}) + } + + // Append From flow: it must exist + flowFrom := map[string]interface{}{ "from": map[string]interface{}{ "uri": from.URI, "steps": dslSteps, }, } - encodedFlow, err := json.Marshal(flow) + encodedFrom, err := json.Marshal(flowFrom) if err != nil { return nil, err } - it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedFlow}) + it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedFrom}) return &it, nil } diff --git a/pkg/trait/kamelets.go b/pkg/trait/kamelets.go index df437d9..9d55b19 100644 --- a/pkg/trait/kamelets.go +++ b/pkg/trait/kamelets.go @@ -103,6 +103,7 @@ func (t *kameletsTrait) Configure(e *Environment) (bool, error) { metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool { util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.FromURIs)) util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.ToURIs)) + util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.ErrorHandlerURIs)) return true }) sort.Strings(kamelets) diff --git a/pkg/util/source/inspector_yaml.go b/pkg/util/source/inspector_yaml.go index 1f96ae8..4c47dc2 100644 --- a/pkg/util/source/inspector_yaml.go +++ b/pkg/util/source/inspector_yaml.go @@ -78,6 +78,10 @@ func (i YAMLInspector) parseStep(key string, content interface{}, meta *Metadata } } } + case "error-handler": + deadLetterChannel := content.(map[interface{}]interface{}) + deadLetterURI := deadLetterChannel["dead-letter-channel"].(map[interface{}]interface{}) + meta.ErrorHandlerURIs = append(meta.ErrorHandlerURIs, deadLetterURI["dead-letter-uri"].(string)) } var maybeURI string diff --git a/pkg/util/source/types.go b/pkg/util/source/types.go index f6f15e0..46074f8 100644 --- a/pkg/util/source/types.go +++ b/pkg/util/source/types.go @@ -25,6 +25,8 @@ type Metadata struct { FromURIs []string // All end URIs of defined routes ToURIs []string + // All error handlers URIs of defined routes + ErrorHandlerURIs []string // All inferred dependencies required to run the integration Dependencies *strset.Set // ExposesHTTPServices indicates if a route defined by the source is exposed
