This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 92873431e970b65a1dee3aff8fc8ebd5b07a8fd7
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Tue Apr 20 16:14:41 2021 +0200

    feat(trait): error handler trait
    
    * KameletBinding can specify the error handler type
    * Introduced a trait to parse and create an error handler source type as 
expected by runtime.
---
 pkg/apis/camel/v1alpha1/kamelet_binding_types.go |  20 ++-
 pkg/controller/kameletbinding/common.go          |  17 +--
 pkg/resources/resources.go                       |   4 +-
 pkg/trait/error_handler.go                       | 162 +++++++++++++++++++++++
 pkg/trait/kamelets.go                            |  48 +++----
 pkg/trait/trait_register.go                      |   1 +
 pkg/trait/trait_test.go                          |   2 +-
 7 files changed, 207 insertions(+), 47 deletions(-)

diff --git a/pkg/apis/camel/v1alpha1/kamelet_binding_types.go 
b/pkg/apis/camel/v1alpha1/kamelet_binding_types.go
index b91ed55..367692c 100644
--- a/pkg/apis/camel/v1alpha1/kamelet_binding_types.go
+++ b/pkg/apis/camel/v1alpha1/kamelet_binding_types.go
@@ -33,11 +33,25 @@ type KameletBindingSpec struct {
        // Sink is the destination of the integration defined by this binding
        Sink Endpoint `json:"sink,omitempty"`
        // ErrorHandler is an optional handler called upon an error occuring in 
the integration
-       ErrorHandler Endpoint `json:"errorHandler,omitempty"`
+       ErrorHandler ErrorHandler `json:"errorHandler,omitempty"`
        // Steps contains an optional list of intermediate steps that are 
executed between the Source and the Sink
        Steps []Endpoint `json:"steps,omitempty"`
 }
 
+type ErrorHandler struct {
+       Endpoint      Endpoint                `json:"endpoint,omitempty"`
+       Type          ErrorHandlerType        `json:"type,omitempty"`
+       Configuration *ErrorHandlerProperties `json:"configuration,omitempty"`
+}
+
+type ErrorHandlerType string
+
+const (
+       ErrorHandlerTypeNone              ErrorHandlerType = "no"
+       ErrorHandlerTypeDefault           ErrorHandlerType = "default"
+       ErrorHandlerTypeDeadLetterChannel ErrorHandlerType = 
"dead-letter-channel"
+)
+
 // Endpoint represents a source/sink external entity
 type Endpoint struct {
        // Ref can be used to declare a Kubernetes resource as source/sink 
endpoint
@@ -59,6 +73,10 @@ const (
        EndpointTypeErrorHandler EndpointType = "errorHandler"
 )
 
+type ErrorHandlerProperties struct {
+       v1.RawMessage `json:",inline"`
+}
+
 // EndpointProperties is a key/value struct represented as JSON raw to allow 
numeric/boolean values
 type EndpointProperties struct {
        v1.RawMessage `json:",inline"`
diff --git a/pkg/controller/kameletbinding/common.go 
b/pkg/controller/kameletbinding/common.go
index fdeab95..a63780f 100644
--- a/pkg/controller/kameletbinding/common.go
+++ b/pkg/controller/kameletbinding/common.go
@@ -22,7 +22,6 @@ import (
        "encoding/json"
        "fmt"
        "sort"
-       "strings"
 
        v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
        "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
@@ -81,8 +80,8 @@ func createIntegrationFor(ctx context.Context, c 
client.Client, kameletbinding *
                return nil, errors.Wrap(err, "could not determine sink URI")
        }
        var errorHandler *bindings.Binding
-       if kameletbinding.Spec.ErrorHandler.Ref != nil || 
kameletbinding.Spec.ErrorHandler.URI != nil {
-               errorHandler, err = bindings.Translate(bindingContext, 
bindings.EndpointContext{Type: v1alpha1.EndpointTypeErrorHandler}, 
kameletbinding.Spec.ErrorHandler)
+       if kameletbinding.Spec.ErrorHandler.Endpoint.Ref != nil || 
kameletbinding.Spec.ErrorHandler.Endpoint.URI != nil {
+               errorHandler, err = bindings.Translate(bindingContext, 
bindings.EndpointContext{Type: v1alpha1.EndpointTypeErrorHandler}, 
kameletbinding.Spec.ErrorHandler.Endpoint)
                if err != nil {
                        return nil, errors.Wrap(err, "could not determine error 
handler URI")
                }
@@ -160,20 +159,14 @@ func createIntegrationFor(ctx context.Context, c 
client.Client, kameletbinding *
        return &it, nil
 }
 
-func setErrorHandlerKamelet(errorHandler *bindings.Binding, kameletSpec 
v1alpha1.Endpoint) error {
+func setErrorHandlerKamelet(errorHandler *bindings.Binding, errorHandlerSpec 
v1alpha1.ErrorHandler) error {
        if errorHandler.ApplicationProperties == nil {
                errorHandler.ApplicationProperties = make(map[string]string)
        }
-       if kameletSpec.URI != nil {
-               if !strings.HasPrefix(*kameletSpec.URI, "kamelet") {
-                       return fmt.Errorf("Kamelet Binding only supports 
kamelet as error handler, provided: %s", *kameletSpec.URI)
-               }
 
-               
errorHandler.ApplicationProperties["camel.k.default-error-handler"] = 
*kameletSpec.URI
-               return nil
-       }
+       errorHandler.ApplicationProperties["camel.k.default-error-handler.uri"] 
= errorHandler.URI
+       
errorHandler.ApplicationProperties["camel.k.default-error-handler.type"] = 
string(errorHandlerSpec.Type)
 
-       errorHandler.ApplicationProperties["camel.k.default-error-handler"] = 
kameletSpec.Ref.Name
        return nil
 }
 
diff --git a/pkg/resources/resources.go b/pkg/resources/resources.go
index 6b712ac..7603bbb 100644
--- a/pkg/resources/resources.go
+++ b/pkg/resources/resources.go
@@ -113,9 +113,9 @@ var assets = func() http.FileSystem {
                "/crd/bases/camel.apache.org_kameletbindings.yaml": 
&vfsgen۰CompressedFileInfo{
                        name:             
"camel.apache.org_kameletbindings.yaml",
                        modTime:          time.Time{},
-                       uncompressedSize: 55181,
+                       uncompressedSize: 68634,
 
-                       compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\x7d\xf1\x6f\xdb\x38\xf2\xef\xef\xf9\x2b\x06\xc9\x01\x6d\x01\x4b\x8e\xe3\xb4\xbb\xeb\xf7\x43\x91\x26\xdb\x7b\x7e\xed\xa6\x45\x92\xde\xe1\x5e\xdb\x03\x68\x69\x6c\xf3\x22\x91\x3a\x92\x8a\xe3\xef\xb6\xff\xfb\x17\x24\x25\x59\x76\x2c\x99\x72\xec\x6e\x0a\x88\xc0\x62\x6b\x8b\x1a\xce\x0c\x87\xc3\x99\x21\x3f\xce\x11\x78\xbb\x6b\x07\x47\xf0\x9e\x06\xc8\x24\x86\xa0\x38\xa8\x29\xc2\x59\x42\x82\x29\xc2\x35\x1f\xab\x19\x11\x08\x
 [...]
+                       compressedContent: 
[]byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\x5d\xff\x6f\xdb\x38\xb2\xff\x3d\x7f\xc5\x20\x39\xa0\x2d\x10\xd9\x71\x92\x76\x77\xfd\x7e\x28\xb2\xc9\xf6\xce\xaf\xdd\x34\xc8\x97\x3b\xdc\x6b\x7b\x00\x2d\x8d\x6d\x5e\x24\x52\x47\x52\x71\xfc\xb6\xfd\xdf\x1f\x48\x4a\xb2\xfc\x45\x12\xe5\xd8\xdd\xf4\x41\x04\x8a\xc6\x16\x35\x9c\x19\x0e\x87\xdf\xe6\x33\x3e\x00\x6f\x7b\x65\xef\x00\x3e\x50\x1f\x99\xc4\x00\x14\x07\x35\x41\x38\x8b\x89\x3f\x41\xb8\xe1\x23\x35\x25\x02\xe1\x1d\x
 [...]
                },
                "/crd/bases/camel.apache.org_kamelets.yaml": 
&vfsgen۰CompressedFileInfo{
                        name:             "camel.apache.org_kamelets.yaml",
diff --git a/pkg/trait/error_handler.go b/pkg/trait/error_handler.go
new file mode 100644
index 0000000..7101d9e
--- /dev/null
+++ b/pkg/trait/error_handler.go
@@ -0,0 +1,162 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package trait
+
+import (
+       "fmt"
+       "strings"
+
+       v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+       "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+)
+
+// The error handler is a platform trait used to inject Error Handler source 
into the integration runtime.
+//
+// +camel-k:trait=error-handler
+type errorHandlerTrait struct {
+       BaseTrait `property:",squash"`
+       // Automatically inject all referenced Kamelets and their default 
configuration (enabled by default)
+       Auto *bool `property:"auto"`
+
+       // TODO move into a struct
+       ErrorHandlerURI  string
+       ErrorHandlerType string
+}
+
+func newErrorHandlerTrait() Trait {
+       return &errorHandlerTrait{
+               BaseTrait: NewBaseTrait("errorHandler", 500),
+       }
+}
+
+// IsPlatformTrait overrides base class method
+func (t *errorHandlerTrait) IsPlatformTrait() bool {
+       return true
+}
+
+func (t *errorHandlerTrait) Configure(e *Environment) (bool, error) {
+       if t.Enabled != nil && !*t.Enabled {
+               return false, nil
+       }
+
+       if !e.IntegrationInPhase(v1.IntegrationPhaseInitialization, 
v1.IntegrationPhaseDeploying, v1.IntegrationPhaseRunning) {
+               return false, nil
+       }
+
+       if t.Auto == nil || *t.Auto {
+               if t.ErrorHandlerType == "" {
+                       t.ErrorHandlerType = 
maybeErrorHandler(e.Integration.Configurations())
+                       if t.ErrorHandlerType != "" && 
v1alpha1.ErrorHandlerType(t.ErrorHandlerType) == 
v1alpha1.ErrorHandlerTypeDeadLetterChannel {
+                               t.ErrorHandlerURI = 
maybeKameletAsDefaultErrorHandler(e.Integration.Configurations())
+                       }
+               }
+       }
+
+       return t.ErrorHandlerType != "", nil
+}
+
+func maybeErrorHandler(properties []v1.ConfigurationSpec) string {
+       for _, property := range properties {
+               if strings.HasPrefix(property.Value, 
"camel.k.default-error-handler.type=") {
+                       split := strings.Split(property.Value, "=")
+                       if len(split) > 0 {
+                               return split[1]
+                       }
+               }
+       }
+
+       return ""
+}
+
+func maybeKameletAsDefaultErrorHandler(properties []v1.ConfigurationSpec) 
string {
+       for _, property := range properties {
+               if strings.HasPrefix(property.Value, 
"camel.k.default-error-handler.uri=") {
+                       split := strings.Split(property.Value, "=")
+                       if len(split) > 0 {
+                               return split[1]
+                       }
+               }
+       }
+
+       return ""
+}
+
+func (t *errorHandlerTrait) Apply(e *Environment) error {
+
+       if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) {
+               if t.ErrorHandlerType != "" {
+                       // Possible error handler
+                       err := addErrorHandlerAsSource(e, t.ErrorHandlerURI, 
t.ErrorHandlerType)
+                       if err != nil {
+                               return err
+                       }
+               }
+       }
+       return nil
+}
+
+func addErrorHandlerAsSource(e *Environment, errorHandlerURI string, 
errorHandlerType string) error {
+       errorHandlerStatement, err := parseErrorHandler(errorHandlerURI, 
errorHandlerType)
+       if err != nil {
+               return err
+       }
+
+       // TODO change to yaml flow when we fix 
https://issues.apache.org/jira/browse/CAMEL-16486
+       errorHandlerSource := v1.SourceSpec{
+               DataSpec: v1.DataSpec{
+                       Name: "ErrorHandlerSource.java",
+                       Content: fmt.Sprintf(`
+                       import org.apache.camel.builder.RouteBuilder;
+                       public class ErrorHandlerSource extends RouteBuilder {
+                       @Override
+                       public void configure() throws Exception {
+                         %s
+                         }
+                       }
+                       `, errorHandlerStatement),
+               },
+               Language: v1.LanguageJavaSource,
+               Type:     v1.SourceTypeErrorHandler,
+       }
+
+       replaced := false
+       for idx, existing := range e.Integration.Status.GeneratedSources {
+               if existing.Name == errorHandlerSource.Name {
+                       replaced = true
+                       e.Integration.Status.GeneratedSources[idx] = 
errorHandlerSource
+               }
+       }
+       if !replaced {
+               e.Integration.Status.GeneratedSources = 
append(e.Integration.Status.GeneratedSources, errorHandlerSource)
+       }
+
+       return nil
+}
+
+func parseErrorHandler(errHandlUri string, errHandlType string) (string, 
error) {
+       switch errHandlType {
+       case "no":
+               return `errorHandler(noErrorHandler());`, nil
+       case "default":
+               return `errorHandler(defaultErrorHandler());`, nil
+       case "dead-letter-channel":
+               return fmt.Sprintf(`errorHandler(deadLetterChannel("%v"));`, 
errHandlUri), nil
+       }
+
+       return "", fmt.Errorf("Cannot recognize any error handler of type %s", 
errHandlType)
+}
diff --git a/pkg/trait/kamelets.go b/pkg/trait/kamelets.go
index 053678d..dd4c3bb 100644
--- a/pkg/trait/kamelets.go
+++ b/pkg/trait/kamelets.go
@@ -47,8 +47,10 @@ type kameletsTrait struct {
        Auto *bool `property:"auto"`
        // Comma separated list of Kamelet names to load into the current 
integration
        List string `property:"list"`
-       // Kamelet name used as error handler
-       ErrorHandler string `property:"error-handler"`
+
+       // TODO move into a struct
+       ErrorHandlerURI  string
+       ErrorHandlerType string
 }
 
 type configurationKey struct {
@@ -96,8 +98,8 @@ func (t *kameletsTrait) Configure(e *Environment) (bool, 
error) {
        }
 
        if t.Auto == nil || *t.Auto {
+               var kamelets []string
                if t.List == "" {
-                       var kamelets []string
                        sources, err := 
kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
                        if err != nil {
                                return false, err
@@ -108,34 +110,25 @@ func (t *kameletsTrait) Configure(e *Environment) (bool, 
error) {
                                return true
                        })
                        sort.Strings(kamelets)
-                       t.List = strings.Join(kamelets, ",")
-               }
-               if t.ErrorHandler == "" {
-                       t.ErrorHandler = 
maybeKameletAsDefaultErrorHandler(e.Integration.Configurations())
                }
-       }
-
-       return t.declareKamelets(), nil
-}
-
-func maybeKameletAsDefaultErrorHandler(properties []v1.ConfigurationSpec) 
string {
-       for _, property := range properties {
-               if strings.HasPrefix(property.Value, 
"camel.k.default-error-handler=") {
-                       split := strings.Split(property.Value, "=")
-                       if len(split) > 0 {
-                               if strings.HasPrefix(split[1], "kamelet:") {
-                                       return extractKamelet(split[1])
+               if t.ErrorHandlerType == "" {
+                       t.ErrorHandlerType = 
maybeErrorHandler(e.Integration.Configurations())
+                       if t.ErrorHandlerType != "" && 
v1alpha1.ErrorHandlerType(t.ErrorHandlerType) == 
v1alpha1.ErrorHandlerTypeDeadLetterChannel {
+                               t.ErrorHandlerURI = 
maybeKameletAsDefaultErrorHandler(e.Integration.Configurations())
+                               if strings.HasPrefix(t.ErrorHandlerURI, 
"kamelet:") {
+                                       kamelets = append(kamelets, 
extractKamelet(t.ErrorHandlerURI))
                                }
-                               return split[1]
                        }
                }
+
+               t.List = strings.Join(kamelets, ",")
        }
 
-       return ""
+       return t.declaredKamelets(), nil
 }
 
-func (t *kameletsTrait) declareKamelets() bool {
-       return len(t.getKameletKeys()) > 0 || t.ErrorHandler != ""
+func (t *kameletsTrait) declaredKamelets() bool {
+       return len(t.getKameletKeys()) > 0
 }
 
 func (t *kameletsTrait) Apply(e *Environment) error {
@@ -156,7 +149,7 @@ func (t *kameletsTrait) Apply(e *Environment) error {
 }
 
 func (t *kameletsTrait) addKamelets(e *Environment) error {
-       if t.declareKamelets() {
+       if t.declaredKamelets() {
                repo, err := repository.NewForPlatform(e.C, e.Client, 
e.Platform, e.Integration.Namespace, platform.GetOperatorNamespace())
                if err != nil {
                        return err
@@ -169,13 +162,6 @@ func (t *kameletsTrait) addKamelets(e *Environment) error {
                                return err
                        }
                }
-               if t.ErrorHandler != "" {
-                       // Possible error handler
-                       err = initializeKamelet(repo, e, t.ErrorHandler, t, 
v1.SourceTypeErrorHandler)
-                       if err != nil {
-                               return err
-                       }
-               }
 
                // resort dependencies
                sort.Strings(e.Integration.Status.Dependencies)
diff --git a/pkg/trait/trait_register.go b/pkg/trait/trait_register.go
index 4c06b37..30e89bd 100644
--- a/pkg/trait/trait_register.go
+++ b/pkg/trait/trait_register.go
@@ -26,6 +26,7 @@ func init() {
        AddToTraits(newOpenAPITrait)
        AddToTraits(newKnativeTrait)
        AddToTraits(newKameletsTrait)
+       AddToTraits(newErrorHandlerTrait)
        AddToTraits(newDependenciesTrait)
        AddToTraits(newBuilderTrait)
        AddToTraits(newQuarkusTrait)
diff --git a/pkg/trait/trait_test.go b/pkg/trait/trait_test.go
index f8f75af..b8fdca8 100644
--- a/pkg/trait/trait_test.go
+++ b/pkg/trait/trait_test.go
@@ -507,7 +507,7 @@ func TestOnlySomeTraitsInfluenceBuild(t *testing.T) {
 
 func TestOnlySomeTraitsArePlatform(t *testing.T) {
        c := NewTraitTestCatalog()
-       platformTraits := []string{"builder", "camel", "jvm", "container", 
"dependencies", "deployer", "deployment", "environment", "kamelets", "openapi", 
"owner", "platform", "quarkus"}
+       platformTraits := []string{"builder", "camel", "jvm", "container", 
"dependencies", "deployer", "deployment", "environment", "errorHandler", 
"kamelets", "openapi", "owner", "platform", "quarkus"}
 
        for _, trait := range c.allTraits() {
                if trait.IsPlatformTrait() {

Reply via email to