This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 1849ca5223a865eb1cf3b7acab7a78c3540f8ea0
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Tue Apr 13 14:20:45 2021 +0200

    feat(kamelets): error handling kamelets binding support
    
    Mark error-handler kamelet source with errorHandler source type
---
 pkg/apis/camel/v1/common_types.go       |   5 +-
 pkg/controller/kameletbinding/common.go |  52 ++++++++--------
 pkg/trait/kamelets.go                   | 106 ++++++++++++++++++++++++--------
 pkg/util/source/inspector_yaml.go       |   4 --
 pkg/util/source/types.go                |   2 -
 5 files changed, 109 insertions(+), 60 deletions(-)

diff --git a/pkg/apis/camel/v1/common_types.go 
b/pkg/apis/camel/v1/common_types.go
index a94d604..e5b327b 100644
--- a/pkg/apis/camel/v1/common_types.go
+++ b/pkg/apis/camel/v1/common_types.go
@@ -206,8 +206,9 @@ type SourceSpec struct {
 type SourceType string
 
 const (
-       SourceTypeDefault  SourceType = ""
-       SourceTypeTemplate SourceType = "template"
+       SourceTypeDefault      SourceType = ""
+       SourceTypeTemplate     SourceType = "template"
+       SourceTypeErrorHandler SourceType = "errorHandler"
 )
 
 // DataSpec --
diff --git a/pkg/controller/kameletbinding/common.go 
b/pkg/controller/kameletbinding/common.go
index 32d7182..fdeab95 100644
--- a/pkg/controller/kameletbinding/common.go
+++ b/pkg/controller/kameletbinding/common.go
@@ -22,6 +22,7 @@ import (
        "encoding/json"
        "fmt"
        "sort"
+       "strings"
 
        v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
        "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
@@ -79,12 +80,17 @@ func createIntegrationFor(ctx context.Context, c 
client.Client, kameletbinding *
        if err != nil {
                return nil, errors.Wrap(err, "could not determine sink URI")
        }
-       var onError *bindings.Binding
+       var errorHandler *bindings.Binding
        if kameletbinding.Spec.ErrorHandler.Ref != nil || 
kameletbinding.Spec.ErrorHandler.URI != nil {
-               onError, err = bindings.Translate(bindingContext, 
bindings.EndpointContext{Type: v1alpha1.EndpointTypeErrorHandler}, 
kameletbinding.Spec.ErrorHandler)
+               errorHandler, err = bindings.Translate(bindingContext, 
bindings.EndpointContext{Type: v1alpha1.EndpointTypeErrorHandler}, 
kameletbinding.Spec.ErrorHandler)
                if err != nil {
                        return nil, errors.Wrap(err, "could not determine error 
handler URI")
                }
+
+               err = setErrorHandlerKamelet(errorHandler, 
kameletbinding.Spec.ErrorHandler)
+               if err != nil {
+                       return nil, errors.Wrap(err, "could not set error 
handler")
+               }
        }
 
        steps := make([]*bindings.Binding, 0, len(kameletbinding.Spec.Steps))
@@ -104,8 +110,8 @@ func createIntegrationFor(ctx context.Context, c 
client.Client, kameletbinding *
        allBindings = append(allBindings, from)
        allBindings = append(allBindings, steps...)
        allBindings = append(allBindings, to)
-       if onError != nil {
-               allBindings = append(allBindings, onError)
+       if errorHandler != nil {
+               allBindings = append(allBindings, errorHandler)
        }
 
        propList := make([]string, 0)
@@ -139,27 +145,6 @@ func createIntegrationFor(ctx context.Context, c 
client.Client, kameletbinding *
                "to": to.URI,
        })
 
-       // Append Error Handler flow, if it exists
-       if onError != nil {
-               flowErrorHandler := map[string]interface{}{
-                       "error-handler": map[string]interface{}{
-                               "dead-letter-channel": map[string]interface{}{
-                                       "dead-letter-uri":                  
onError.URI,
-                                       "dead-letter-handle-new-exception": 
true,
-                                       "async-delayed-redelivery":         
false,
-                                       "use-original-message":             
true,
-                               },
-                       },
-               }
-               encodedErrorHandler, err := json.Marshal(flowErrorHandler)
-
-               if err != nil {
-                       return nil, err
-               }
-               it.Spec.Flows = append(it.Spec.Flows, v1.Flow{RawMessage: 
encodedErrorHandler})
-       }
-
-       // Append From flow: it must exist
        flowFrom := map[string]interface{}{
                "from": map[string]interface{}{
                        "uri":   from.URI,
@@ -175,6 +160,23 @@ func createIntegrationFor(ctx context.Context, c 
client.Client, kameletbinding *
        return &it, nil
 }
 
+func setErrorHandlerKamelet(errorHandler *bindings.Binding, kameletSpec 
v1alpha1.Endpoint) error {
+       if errorHandler.ApplicationProperties == nil {
+               errorHandler.ApplicationProperties = make(map[string]string)
+       }
+       if kameletSpec.URI != nil {
+               if !strings.HasPrefix(*kameletSpec.URI, "kamelet") {
+                       return fmt.Errorf("Kamelet Binding only supports 
kamelet as error handler, provided: %s", *kameletSpec.URI)
+               }
+
+               
errorHandler.ApplicationProperties["camel.k.default-error-handler"] = 
*kameletSpec.URI
+               return nil
+       }
+
+       errorHandler.ApplicationProperties["camel.k.default-error-handler"] = 
kameletSpec.Ref.Name
+       return nil
+}
+
 func determineProfile(ctx context.Context, c client.Client, binding 
*v1alpha1.KameletBinding) (v1.TraitProfile, error) {
        if binding.Spec.Integration != nil && binding.Spec.Integration.Profile 
!= "" {
                return binding.Spec.Integration.Profile, nil
diff --git a/pkg/trait/kamelets.go b/pkg/trait/kamelets.go
index 9d55b19..053678d 100644
--- a/pkg/trait/kamelets.go
+++ b/pkg/trait/kamelets.go
@@ -47,6 +47,8 @@ type kameletsTrait struct {
        Auto *bool `property:"auto"`
        // Comma separated list of Kamelet names to load into the current 
integration
        List string `property:"list"`
+       // Kamelet name used as error handler
+       ErrorHandler string `property:"error-handler"`
 }
 
 type configurationKey struct {
@@ -103,16 +105,37 @@ func (t *kameletsTrait) Configure(e *Environment) (bool, 
error) {
                        metadata.Each(e.CamelCatalog, sources, func(_ int, meta 
metadata.IntegrationMetadata) bool {
                                util.StringSliceUniqueConcat(&kamelets, 
extractKamelets(meta.FromURIs))
                                util.StringSliceUniqueConcat(&kamelets, 
extractKamelets(meta.ToURIs))
-                               util.StringSliceUniqueConcat(&kamelets, 
extractKamelets(meta.ErrorHandlerURIs))
                                return true
                        })
                        sort.Strings(kamelets)
                        t.List = strings.Join(kamelets, ",")
                }
+               if t.ErrorHandler == "" {
+                       t.ErrorHandler = 
maybeKameletAsDefaultErrorHandler(e.Integration.Configurations())
+               }
+       }
+
+       return t.declareKamelets(), nil
+}
 
+func maybeKameletAsDefaultErrorHandler(properties []v1.ConfigurationSpec) 
string {
+       for _, property := range properties {
+               if strings.HasPrefix(property.Value, 
"camel.k.default-error-handler=") {
+                       split := strings.Split(property.Value, "=")
+                       if len(split) > 0 {
+                               if strings.HasPrefix(split[1], "kamelet:") {
+                                       return extractKamelet(split[1])
+                               }
+                               return split[1]
+                       }
+               }
        }
 
-       return len(t.getKameletKeys()) > 0, nil
+       return ""
+}
+
+func (t *kameletsTrait) declareKamelets() bool {
+       return len(t.getKameletKeys()) > 0 || t.ErrorHandler != ""
 }
 
 func (t *kameletsTrait) Apply(e *Environment) error {
@@ -133,44 +156,62 @@ func (t *kameletsTrait) Apply(e *Environment) error {
 }
 
 func (t *kameletsTrait) addKamelets(e *Environment) error {
-       kameletKeys := t.getKameletKeys()
-       if len(kameletKeys) > 0 {
+       if t.declareKamelets() {
                repo, err := repository.NewForPlatform(e.C, e.Client, 
e.Platform, e.Integration.Namespace, platform.GetOperatorNamespace())
                if err != nil {
                        return err
                }
+
+               // Declared kamelets
                for _, k := range t.getKameletKeys() {
-                       kamelet, err := repo.Get(e.C, k)
+                       err := initializeKamelet(repo, e, k, t, 
v1.SourceTypeTemplate)
                        if err != nil {
                                return err
                        }
-                       if kamelet == nil {
-                               return fmt.Errorf("kamelet %s not found in any 
of the defined repositories: %s", k, repo.String())
-                       }
-
-                       // Initialize remote kamelets
-                       kamelet, err = kameletutils.Initialize(kamelet)
+               }
+               if t.ErrorHandler != "" {
+                       // Possible error handler
+                       err = initializeKamelet(repo, e, t.ErrorHandler, t, 
v1.SourceTypeErrorHandler)
                        if err != nil {
                                return err
                        }
-
-                       if kamelet.Status.Phase != v1alpha1.KameletPhaseReady {
-                               return fmt.Errorf("kamelet %q is not %s: %s", 
k, v1alpha1.KameletPhaseReady, kamelet.Status.Phase)
-                       }
-
-                       if err := t.addKameletAsSource(e, kamelet); err != nil {
-                               return err
-                       }
-
-                       // Adding dependencies from Kamelets
-                       
util.StringSliceUniqueConcat(&e.Integration.Status.Dependencies, 
kamelet.Spec.Dependencies)
                }
+
                // resort dependencies
                sort.Strings(e.Integration.Status.Dependencies)
        }
        return nil
 }
 
+func initializeKamelet(repo repository.KameletRepository, e *Environment, k 
string, t *kameletsTrait, sourceType v1.SourceType) error {
+       kamelet, err := repo.Get(e.C, k)
+       if err != nil {
+               return err
+       }
+       if kamelet == nil {
+               return fmt.Errorf("kamelet %s not found in any of the defined 
repositories: %s", k, repo.String())
+       }
+
+       // Initialize remote kamelets
+       kamelet, err = kameletutils.Initialize(kamelet)
+       if err != nil {
+               return err
+       }
+
+       if kamelet.Status.Phase != v1alpha1.KameletPhaseReady {
+               return fmt.Errorf("kamelet %q is not %s: %s", k, 
v1alpha1.KameletPhaseReady, kamelet.Status.Phase)
+       }
+
+       if err := t.addKameletAsSource(e, kamelet, sourceType); err != nil {
+               return err
+       }
+
+       // Adding dependencies from Kamelets
+       util.StringSliceUniqueConcat(&e.Integration.Status.Dependencies, 
kamelet.Spec.Dependencies)
+
+       return nil
+}
+
 func (t *kameletsTrait) configureApplicationProperties(e *Environment) error {
        if len(t.getKameletKeys()) > 0 {
                repo, err := repository.NewForPlatform(e.C, e.Client, 
e.Platform, e.Integration.Namespace, platform.GetOperatorNamespace())
@@ -215,7 +256,7 @@ func (t *kameletsTrait) configureApplicationProperties(e 
*Environment) error {
        return nil
 }
 
-func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet 
*v1alpha1.Kamelet) error {
+func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet 
*v1alpha1.Kamelet, sourceType v1.SourceType) error {
        sources := make([]v1.SourceSpec, 0)
 
        if kamelet.Spec.Flow != nil {
@@ -236,7 +277,7 @@ func (t *kameletsTrait) addKameletAsSource(e *Environment, 
kamelet *v1alpha1.Kam
                                Content: string(flowData),
                        },
                        Language:      v1.LanguageYaml,
-                       Type:          v1.SourceTypeTemplate,
+                       Type:          sourceType,
                        PropertyNames: propertyNames,
                }
                flowSource, err = integrationSourceFromKameletSource(e, 
kamelet, flowSource, fmt.Sprintf("%s-kamelet-%s-flow", e.Integration.Name, 
kamelet.Name))
@@ -247,6 +288,9 @@ func (t *kameletsTrait) addKameletAsSource(e *Environment, 
kamelet *v1alpha1.Kam
        }
 
        for idx, s := range kamelet.Spec.Sources {
+               if sourceType == v1.SourceTypeErrorHandler {
+                       s.Type = sourceType
+               }
                intSource, err := integrationSourceFromKameletSource(e, 
kamelet, s, fmt.Sprintf("%s-kamelet-%s-%03d", e.Integration.Name, kamelet.Name, 
idx))
                if err != nil {
                        return err
@@ -405,10 +449,18 @@ func integrationSourceFromKameletSource(e *Environment, 
kamelet *v1alpha1.Kamele
 
 func extractKamelets(uris []string) (kamelets []string) {
        for _, uri := range uris {
-               matches := kameletNameRegexp.FindStringSubmatch(uri)
-               if len(matches) > 1 {
-                       kamelets = append(kamelets, matches[1])
+               kamelet := extractKamelet(uri)
+               if kamelet != "" {
+                       kamelets = append(kamelets, kamelet)
                }
        }
        return
 }
+
+func extractKamelet(uri string) (kamelet string) {
+       matches := kameletNameRegexp.FindStringSubmatch(uri)
+       if len(matches) > 1 {
+               return matches[1]
+       }
+       return ""
+}
diff --git a/pkg/util/source/inspector_yaml.go 
b/pkg/util/source/inspector_yaml.go
index 4c47dc2..1f96ae8 100644
--- a/pkg/util/source/inspector_yaml.go
+++ b/pkg/util/source/inspector_yaml.go
@@ -78,10 +78,6 @@ func (i YAMLInspector) parseStep(key string, content 
interface{}, meta *Metadata
                                }
                        }
                }
-       case "error-handler":
-               deadLetterChannel := content.(map[interface{}]interface{})
-               deadLetterURI := 
deadLetterChannel["dead-letter-channel"].(map[interface{}]interface{})
-               meta.ErrorHandlerURIs = append(meta.ErrorHandlerURIs, 
deadLetterURI["dead-letter-uri"].(string))
        }
 
        var maybeURI string
diff --git a/pkg/util/source/types.go b/pkg/util/source/types.go
index 46074f8..f6f15e0 100644
--- a/pkg/util/source/types.go
+++ b/pkg/util/source/types.go
@@ -25,8 +25,6 @@ type Metadata struct {
        FromURIs []string
        // All end URIs of defined routes
        ToURIs []string
-       // All error handlers URIs of defined routes
-       ErrorHandlerURIs []string
        // All inferred dependencies required to run the integration
        Dependencies *strset.Set
        // ExposesHTTPServices indicates if a route defined by the source is 
exposed

Reply via email to