This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 1849ca5223a865eb1cf3b7acab7a78c3540f8ea0 Author: Pasquale Congiusti <[email protected]> AuthorDate: Tue Apr 13 14:20:45 2021 +0200 feat(kamelets): error handling kamelets binding support Mark error-handler kamelet source with errorHandler source type --- pkg/apis/camel/v1/common_types.go | 5 +- pkg/controller/kameletbinding/common.go | 52 ++++++++-------- pkg/trait/kamelets.go | 106 ++++++++++++++++++++++++-------- pkg/util/source/inspector_yaml.go | 4 -- pkg/util/source/types.go | 2 - 5 files changed, 109 insertions(+), 60 deletions(-) diff --git a/pkg/apis/camel/v1/common_types.go b/pkg/apis/camel/v1/common_types.go index a94d604..e5b327b 100644 --- a/pkg/apis/camel/v1/common_types.go +++ b/pkg/apis/camel/v1/common_types.go @@ -206,8 +206,9 @@ type SourceSpec struct { type SourceType string const ( - SourceTypeDefault SourceType = "" - SourceTypeTemplate SourceType = "template" + SourceTypeDefault SourceType = "" + SourceTypeTemplate SourceType = "template" + SourceTypeErrorHandler SourceType = "errorHandler" ) // DataSpec -- diff --git a/pkg/controller/kameletbinding/common.go b/pkg/controller/kameletbinding/common.go index 32d7182..fdeab95 100644 --- a/pkg/controller/kameletbinding/common.go +++ b/pkg/controller/kameletbinding/common.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "sort" + "strings" v1 "github.com/apache/camel-k/pkg/apis/camel/v1" "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" @@ -79,12 +80,17 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding * if err != nil { return nil, errors.Wrap(err, "could not determine sink URI") } - var onError *bindings.Binding + var errorHandler *bindings.Binding if kameletbinding.Spec.ErrorHandler.Ref != nil || kameletbinding.Spec.ErrorHandler.URI != nil { - onError, err = bindings.Translate(bindingContext, bindings.EndpointContext{Type: v1alpha1.EndpointTypeErrorHandler}, kameletbinding.Spec.ErrorHandler) + errorHandler, err = bindings.Translate(bindingContext, bindings.EndpointContext{Type: v1alpha1.EndpointTypeErrorHandler}, kameletbinding.Spec.ErrorHandler) if err != nil { return nil, errors.Wrap(err, "could not determine error handler URI") } + + err = setErrorHandlerKamelet(errorHandler, kameletbinding.Spec.ErrorHandler) + if err != nil { + return nil, errors.Wrap(err, "could not set error handler") + } } steps := make([]*bindings.Binding, 0, len(kameletbinding.Spec.Steps)) @@ -104,8 +110,8 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding * allBindings = append(allBindings, from) allBindings = append(allBindings, steps...) allBindings = append(allBindings, to) - if onError != nil { - allBindings = append(allBindings, onError) + if errorHandler != nil { + allBindings = append(allBindings, errorHandler) } propList := make([]string, 0) @@ -139,27 +145,6 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding * "to": to.URI, }) - // Append Error Handler flow, if it exists - if onError != nil { - flowErrorHandler := map[string]interface{}{ - "error-handler": map[string]interface{}{ - "dead-letter-channel": map[string]interface{}{ - "dead-letter-uri": onError.URI, - "dead-letter-handle-new-exception": true, - "async-delayed-redelivery": false, - "use-original-message": true, - }, - }, - } - encodedErrorHandler, err := json.Marshal(flowErrorHandler) - - if err != nil { - return nil, err - } - it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: encodedErrorHandler}) - } - - // Append From flow: it must exist flowFrom := map[string]interface{}{ "from": map[string]interface{}{ "uri": from.URI, @@ -175,6 +160,23 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding * return &it, nil } +func setErrorHandlerKamelet(errorHandler *bindings.Binding, kameletSpec v1alpha1.Endpoint) error { + if errorHandler.ApplicationProperties == nil { + errorHandler.ApplicationProperties = make(map[string]string) + } + if kameletSpec.URI != nil { + if !strings.HasPrefix(*kameletSpec.URI, "kamelet") { + return fmt.Errorf("Kamelet Binding only supports kamelet as error handler, provided: %s", *kameletSpec.URI) + } + + errorHandler.ApplicationProperties["camel.k.default-error-handler"] = *kameletSpec.URI + return nil + } + + errorHandler.ApplicationProperties["camel.k.default-error-handler"] = kameletSpec.Ref.Name + return nil +} + func determineProfile(ctx context.Context, c client.Client, binding *v1alpha1.KameletBinding) (v1.TraitProfile, error) { if binding.Spec.Integration != nil && binding.Spec.Integration.Profile != "" { return binding.Spec.Integration.Profile, nil diff --git a/pkg/trait/kamelets.go b/pkg/trait/kamelets.go index 9d55b19..053678d 100644 --- a/pkg/trait/kamelets.go +++ b/pkg/trait/kamelets.go @@ -47,6 +47,8 @@ type kameletsTrait struct { Auto *bool `property:"auto"` // Comma separated list of Kamelet names to load into the current integration List string `property:"list"` + // Kamelet name used as error handler + ErrorHandler string `property:"error-handler"` } type configurationKey struct { @@ -103,16 +105,37 @@ func (t *kameletsTrait) Configure(e *Environment) (bool, error) { metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool { util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.FromURIs)) util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.ToURIs)) - util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.ErrorHandlerURIs)) return true }) sort.Strings(kamelets) t.List = strings.Join(kamelets, ",") } + if t.ErrorHandler == "" { + t.ErrorHandler = maybeKameletAsDefaultErrorHandler(e.Integration.Configurations()) + } + } + + return t.declareKamelets(), nil +} +func maybeKameletAsDefaultErrorHandler(properties []v1.ConfigurationSpec) string { + for _, property := range properties { + if strings.HasPrefix(property.Value, "camel.k.default-error-handler=") { + split := strings.Split(property.Value, "=") + if len(split) > 0 { + if strings.HasPrefix(split[1], "kamelet:") { + return extractKamelet(split[1]) + } + return split[1] + } + } } - return len(t.getKameletKeys()) > 0, nil + return "" +} + +func (t *kameletsTrait) declareKamelets() bool { + return len(t.getKameletKeys()) > 0 || t.ErrorHandler != "" } func (t *kameletsTrait) Apply(e *Environment) error { @@ -133,44 +156,62 @@ func (t *kameletsTrait) Apply(e *Environment) error { } func (t *kameletsTrait) addKamelets(e *Environment) error { - kameletKeys := t.getKameletKeys() - if len(kameletKeys) > 0 { + if t.declareKamelets() { repo, err := repository.NewForPlatform(e.C, e.Client, e.Platform, e.Integration.Namespace, platform.GetOperatorNamespace()) if err != nil { return err } + + // Declared kamelets for _, k := range t.getKameletKeys() { - kamelet, err := repo.Get(e.C, k) + err := initializeKamelet(repo, e, k, t, v1.SourceTypeTemplate) if err != nil { return err } - if kamelet == nil { - return fmt.Errorf("kamelet %s not found in any of the defined repositories: %s", k, repo.String()) - } - - // Initialize remote kamelets - kamelet, err = kameletutils.Initialize(kamelet) + } + if t.ErrorHandler != "" { + // Possible error handler + err = initializeKamelet(repo, e, t.ErrorHandler, t, v1.SourceTypeErrorHandler) if err != nil { return err } - - if kamelet.Status.Phase != v1alpha1.KameletPhaseReady { - return fmt.Errorf("kamelet %q is not %s: %s", k, v1alpha1.KameletPhaseReady, kamelet.Status.Phase) - } - - if err := t.addKameletAsSource(e, kamelet); err != nil { - return err - } - - // Adding dependencies from Kamelets - util.StringSliceUniqueConcat(&e.Integration.Status.Dependencies, kamelet.Spec.Dependencies) } + // resort dependencies sort.Strings(e.Integration.Status.Dependencies) } return nil } +func initializeKamelet(repo repository.KameletRepository, e *Environment, k string, t *kameletsTrait, sourceType v1.SourceType) error { + kamelet, err := repo.Get(e.C, k) + if err != nil { + return err + } + if kamelet == nil { + return fmt.Errorf("kamelet %s not found in any of the defined repositories: %s", k, repo.String()) + } + + // Initialize remote kamelets + kamelet, err = kameletutils.Initialize(kamelet) + if err != nil { + return err + } + + if kamelet.Status.Phase != v1alpha1.KameletPhaseReady { + return fmt.Errorf("kamelet %q is not %s: %s", k, v1alpha1.KameletPhaseReady, kamelet.Status.Phase) + } + + if err := t.addKameletAsSource(e, kamelet, sourceType); err != nil { + return err + } + + // Adding dependencies from Kamelets + util.StringSliceUniqueConcat(&e.Integration.Status.Dependencies, kamelet.Spec.Dependencies) + + return nil +} + func (t *kameletsTrait) configureApplicationProperties(e *Environment) error { if len(t.getKameletKeys()) > 0 { repo, err := repository.NewForPlatform(e.C, e.Client, e.Platform, e.Integration.Namespace, platform.GetOperatorNamespace()) @@ -215,7 +256,7 @@ func (t *kameletsTrait) configureApplicationProperties(e *Environment) error { return nil } -func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet *v1alpha1.Kamelet) error { +func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet *v1alpha1.Kamelet, sourceType v1.SourceType) error { sources := make([]v1.SourceSpec, 0) if kamelet.Spec.Flow != nil { @@ -236,7 +277,7 @@ func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet *v1alpha1.Kam Content: string(flowData), }, Language: v1.LanguageYaml, - Type: v1.SourceTypeTemplate, + Type: sourceType, PropertyNames: propertyNames, } flowSource, err = integrationSourceFromKameletSource(e, kamelet, flowSource, fmt.Sprintf("%s-kamelet-%s-flow", e.Integration.Name, kamelet.Name)) @@ -247,6 +288,9 @@ func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet *v1alpha1.Kam } for idx, s := range kamelet.Spec.Sources { + if sourceType == v1.SourceTypeErrorHandler { + s.Type = sourceType + } intSource, err := integrationSourceFromKameletSource(e, kamelet, s, fmt.Sprintf("%s-kamelet-%s-%03d", e.Integration.Name, kamelet.Name, idx)) if err != nil { return err @@ -405,10 +449,18 @@ func integrationSourceFromKameletSource(e *Environment, kamelet *v1alpha1.Kamele func extractKamelets(uris []string) (kamelets []string) { for _, uri := range uris { - matches := kameletNameRegexp.FindStringSubmatch(uri) - if len(matches) > 1 { - kamelets = append(kamelets, matches[1]) + kamelet := extractKamelet(uri) + if kamelet != "" { + kamelets = append(kamelets, kamelet) } } return } + +func extractKamelet(uri string) (kamelet string) { + matches := kameletNameRegexp.FindStringSubmatch(uri) + if len(matches) > 1 { + return matches[1] + } + return "" +} diff --git a/pkg/util/source/inspector_yaml.go b/pkg/util/source/inspector_yaml.go index 4c47dc2..1f96ae8 100644 --- a/pkg/util/source/inspector_yaml.go +++ b/pkg/util/source/inspector_yaml.go @@ -78,10 +78,6 @@ func (i YAMLInspector) parseStep(key string, content interface{}, meta *Metadata } } } - case "error-handler": - deadLetterChannel := content.(map[interface{}]interface{}) - deadLetterURI := deadLetterChannel["dead-letter-channel"].(map[interface{}]interface{}) - meta.ErrorHandlerURIs = append(meta.ErrorHandlerURIs, deadLetterURI["dead-letter-uri"].(string)) } var maybeURI string diff --git a/pkg/util/source/types.go b/pkg/util/source/types.go index 46074f8..f6f15e0 100644 --- a/pkg/util/source/types.go +++ b/pkg/util/source/types.go @@ -25,8 +25,6 @@ type Metadata struct { FromURIs []string // All end URIs of defined routes ToURIs []string - // All error handlers URIs of defined routes - ErrorHandlerURIs []string // All inferred dependencies required to run the integration Dependencies *strset.Set // ExposesHTTPServices indicates if a route defined by the source is exposed
