This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 92873431e970b65a1dee3aff8fc8ebd5b07a8fd7 Author: Pasquale Congiusti <[email protected]> AuthorDate: Tue Apr 20 16:14:41 2021 +0200 feat(trait): error handler trait * KameletBinding can specify the error handler type * Introduced a trait to parse and create an error handler source type as expected by runtime. --- pkg/apis/camel/v1alpha1/kamelet_binding_types.go | 20 ++- pkg/controller/kameletbinding/common.go | 17 +-- pkg/resources/resources.go | 4 +- pkg/trait/error_handler.go | 162 +++++++++++++++++++++++ pkg/trait/kamelets.go | 48 +++---- pkg/trait/trait_register.go | 1 + pkg/trait/trait_test.go | 2 +- 7 files changed, 207 insertions(+), 47 deletions(-) diff --git a/pkg/apis/camel/v1alpha1/kamelet_binding_types.go b/pkg/apis/camel/v1alpha1/kamelet_binding_types.go index b91ed55..367692c 100644 --- a/pkg/apis/camel/v1alpha1/kamelet_binding_types.go +++ b/pkg/apis/camel/v1alpha1/kamelet_binding_types.go @@ -33,11 +33,25 @@ type KameletBindingSpec struct { // Sink is the destination of the integration defined by this binding Sink Endpoint `json:"sink,omitempty"` // ErrorHandler is an optional handler called upon an error occuring in the integration - ErrorHandler Endpoint `json:"errorHandler,omitempty"` + ErrorHandler ErrorHandler `json:"errorHandler,omitempty"` // Steps contains an optional list of intermediate steps that are executed between the Source and the Sink Steps []Endpoint `json:"steps,omitempty"` } +type ErrorHandler struct { + Endpoint Endpoint `json:"endpoint,omitempty"` + Type ErrorHandlerType `json:"type,omitempty"` + Configuration *ErrorHandlerProperties `json:"configuration,omitempty"` +} + +type ErrorHandlerType string + +const ( + ErrorHandlerTypeNone ErrorHandlerType = "no" + ErrorHandlerTypeDefault ErrorHandlerType = "default" + ErrorHandlerTypeDeadLetterChannel ErrorHandlerType = "dead-letter-channel" +) + // Endpoint represents a source/sink external entity type Endpoint struct { // Ref can be used to declare a Kubernetes resource as source/sink endpoint @@ -59,6 +73,10 @@ const ( EndpointTypeErrorHandler EndpointType = "errorHandler" ) +type ErrorHandlerProperties struct { + v1.RawMessage `json:",inline"` +} + // EndpointProperties is a key/value struct represented as JSON raw to allow numeric/boolean values type EndpointProperties struct { v1.RawMessage `json:",inline"` diff --git a/pkg/controller/kameletbinding/common.go b/pkg/controller/kameletbinding/common.go index fdeab95..a63780f 100644 --- a/pkg/controller/kameletbinding/common.go +++ b/pkg/controller/kameletbinding/common.go @@ -22,7 +22,6 @@ import ( "encoding/json" "fmt" "sort" - "strings" v1 "github.com/apache/camel-k/pkg/apis/camel/v1" "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" @@ -81,8 +80,8 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding * return nil, errors.Wrap(err, "could not determine sink URI") } var errorHandler *bindings.Binding - if kameletbinding.Spec.ErrorHandler.Ref != nil || kameletbinding.Spec.ErrorHandler.URI != nil { - errorHandler, err = bindings.Translate(bindingContext, bindings.EndpointContext{Type: v1alpha1.EndpointTypeErrorHandler}, kameletbinding.Spec.ErrorHandler) + if kameletbinding.Spec.ErrorHandler.Endpoint.Ref != nil || kameletbinding.Spec.ErrorHandler.Endpoint.URI != nil { + errorHandler, err = bindings.Translate(bindingContext, bindings.EndpointContext{Type: v1alpha1.EndpointTypeErrorHandler}, kameletbinding.Spec.ErrorHandler.Endpoint) if err != nil { return nil, errors.Wrap(err, "could not determine error handler URI") } @@ -160,20 +159,14 @@ func createIntegrationFor(ctx context.Context, c client.Client, kameletbinding * return &it, nil } -func setErrorHandlerKamelet(errorHandler *bindings.Binding, kameletSpec v1alpha1.Endpoint) error { +func setErrorHandlerKamelet(errorHandler *bindings.Binding, errorHandlerSpec v1alpha1.ErrorHandler) error { if errorHandler.ApplicationProperties == nil { errorHandler.ApplicationProperties = make(map[string]string) } - if kameletSpec.URI != nil { - if !strings.HasPrefix(*kameletSpec.URI, "kamelet") { - return fmt.Errorf("Kamelet Binding only supports kamelet as error handler, provided: %s", *kameletSpec.URI) - } - errorHandler.ApplicationProperties["camel.k.default-error-handler"] = *kameletSpec.URI - return nil - } + errorHandler.ApplicationProperties["camel.k.default-error-handler.uri"] = errorHandler.URI + errorHandler.ApplicationProperties["camel.k.default-error-handler.type"] = string(errorHandlerSpec.Type) - errorHandler.ApplicationProperties["camel.k.default-error-handler"] = kameletSpec.Ref.Name return nil } diff --git a/pkg/resources/resources.go b/pkg/resources/resources.go index 6b712ac..7603bbb 100644 --- a/pkg/resources/resources.go +++ b/pkg/resources/resources.go @@ -113,9 +113,9 @@ var assets = func() http.FileSystem { "/crd/bases/camel.apache.org_kameletbindings.yaml": &vfsgen۰CompressedFileInfo{ name: "camel.apache.org_kameletbindings.yaml", modTime: time.Time{}, - uncompressedSize: 55181, + uncompressedSize: 68634, - compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\x7d\xf1\x6f\xdb\x38\xf2\xef\xef\xf9\x2b\x06\xc9\x01\x6d\x01\x4b\x8e\xe3\xb4\xbb\xeb\xf7\x43\x91\x26\xdb\x7b\x7e\xed\xa6\x45\x92\xde\xe1\x5e\xdb\x03\x68\x69\x6c\xf3\x22\x91\x3a\x92\x8a\xe3\xef\xb6\xff\xfb\x17\x24\x25\x59\x76\x2c\x99\x72\xec\x6e\x0a\x88\xc0\x62\x6b\x8b\x1a\xce\x0c\x87\xc3\x99\x21\x3f\xce\x11\x78\xbb\x6b\x07\x47\xf0\x9e\x06\xc8\x24\x86\xa0\x38\xa8\x29\xc2\x59\x42\x82\x29\xc2\x35\x1f\xab\x19\x11\x08\x [...] + compressedContent: []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\x5d\xff\x6f\xdb\x38\xb2\xff\x3d\x7f\xc5\x20\x39\xa0\x2d\x10\xd9\x71\x92\x76\x77\xfd\x7e\x28\xb2\xc9\xf6\xce\xaf\xdd\x34\xc8\x97\x3b\xdc\x6b\x7b\x00\x2d\x8d\x6d\x5e\x24\x52\x47\x52\x71\xfc\xb6\xfd\xdf\x1f\x48\x4a\xb2\xfc\x45\x12\xe5\xd8\xdd\xf4\x41\x04\x8a\xc6\x16\x35\x9c\x19\x0e\x87\xdf\xe6\x33\x3e\x00\x6f\x7b\x65\xef\x00\x3e\x50\x1f\x99\xc4\x00\x14\x07\x35\x41\x38\x8b\x89\x3f\x41\xb8\xe1\x23\x35\x25\x02\xe1\x1d\x [...] }, "/crd/bases/camel.apache.org_kamelets.yaml": &vfsgen۰CompressedFileInfo{ name: "camel.apache.org_kamelets.yaml", diff --git a/pkg/trait/error_handler.go b/pkg/trait/error_handler.go new file mode 100644 index 0000000..7101d9e --- /dev/null +++ b/pkg/trait/error_handler.go @@ -0,0 +1,162 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package trait + +import ( + "fmt" + "strings" + + v1 "github.com/apache/camel-k/pkg/apis/camel/v1" + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" +) + +// The error handler is a platform trait used to inject Error Handler source into the integration runtime. +// +// +camel-k:trait=error-handler +type errorHandlerTrait struct { + BaseTrait `property:",squash"` + // Automatically inject all referenced Kamelets and their default configuration (enabled by default) + Auto *bool `property:"auto"` + + // TODO move into a struct + ErrorHandlerURI string + ErrorHandlerType string +} + +func newErrorHandlerTrait() Trait { + return &errorHandlerTrait{ + BaseTrait: NewBaseTrait("errorHandler", 500), + } +} + +// IsPlatformTrait overrides base class method +func (t *errorHandlerTrait) IsPlatformTrait() bool { + return true +} + +func (t *errorHandlerTrait) Configure(e *Environment) (bool, error) { + if t.Enabled != nil && !*t.Enabled { + return false, nil + } + + if !e.IntegrationInPhase(v1.IntegrationPhaseInitialization, v1.IntegrationPhaseDeploying, v1.IntegrationPhaseRunning) { + return false, nil + } + + if t.Auto == nil || *t.Auto { + if t.ErrorHandlerType == "" { + t.ErrorHandlerType = maybeErrorHandler(e.Integration.Configurations()) + if t.ErrorHandlerType != "" && v1alpha1.ErrorHandlerType(t.ErrorHandlerType) == v1alpha1.ErrorHandlerTypeDeadLetterChannel { + t.ErrorHandlerURI = maybeKameletAsDefaultErrorHandler(e.Integration.Configurations()) + } + } + } + + return t.ErrorHandlerType != "", nil +} + +func maybeErrorHandler(properties []v1.ConfigurationSpec) string { + for _, property := range properties { + if strings.HasPrefix(property.Value, "camel.k.default-error-handler.type=") { + split := strings.Split(property.Value, "=") + if len(split) > 0 { + return split[1] + } + } + } + + return "" +} + +func maybeKameletAsDefaultErrorHandler(properties []v1.ConfigurationSpec) string { + for _, property := range properties { + if strings.HasPrefix(property.Value, "camel.k.default-error-handler.uri=") { + split := strings.Split(property.Value, "=") + if len(split) > 0 { + return split[1] + } + } + } + + return "" +} + +func (t *errorHandlerTrait) Apply(e *Environment) error { + + if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) { + if t.ErrorHandlerType != "" { + // Possible error handler + err := addErrorHandlerAsSource(e, t.ErrorHandlerURI, t.ErrorHandlerType) + if err != nil { + return err + } + } + } + return nil +} + +func addErrorHandlerAsSource(e *Environment, errorHandlerURI string, errorHandlerType string) error { + errorHandlerStatement, err := parseErrorHandler(errorHandlerURI, errorHandlerType) + if err != nil { + return err + } + + // TODO change to yaml flow when we fix https://issues.apache.org/jira/browse/CAMEL-16486 + errorHandlerSource := v1.SourceSpec{ + DataSpec: v1.DataSpec{ + Name: "ErrorHandlerSource.java", + Content: fmt.Sprintf(` + import org.apache.camel.builder.RouteBuilder; + public class ErrorHandlerSource extends RouteBuilder { + @Override + public void configure() throws Exception { + %s + } + } + `, errorHandlerStatement), + }, + Language: v1.LanguageJavaSource, + Type: v1.SourceTypeErrorHandler, + } + + replaced := false + for idx, existing := range e.Integration.Status.GeneratedSources { + if existing.Name == errorHandlerSource.Name { + replaced = true + e.Integration.Status.GeneratedSources[idx] = errorHandlerSource + } + } + if !replaced { + e.Integration.Status.GeneratedSources = append(e.Integration.Status.GeneratedSources, errorHandlerSource) + } + + return nil +} + +func parseErrorHandler(errHandlUri string, errHandlType string) (string, error) { + switch errHandlType { + case "no": + return `errorHandler(noErrorHandler());`, nil + case "default": + return `errorHandler(defaultErrorHandler());`, nil + case "dead-letter-channel": + return fmt.Sprintf(`errorHandler(deadLetterChannel("%v"));`, errHandlUri), nil + } + + return "", fmt.Errorf("Cannot recognize any error handler of type %s", errHandlType) +} diff --git a/pkg/trait/kamelets.go b/pkg/trait/kamelets.go index 053678d..dd4c3bb 100644 --- a/pkg/trait/kamelets.go +++ b/pkg/trait/kamelets.go @@ -47,8 +47,10 @@ type kameletsTrait struct { Auto *bool `property:"auto"` // Comma separated list of Kamelet names to load into the current integration List string `property:"list"` - // Kamelet name used as error handler - ErrorHandler string `property:"error-handler"` + + // TODO move into a struct + ErrorHandlerURI string + ErrorHandlerType string } type configurationKey struct { @@ -96,8 +98,8 @@ func (t *kameletsTrait) Configure(e *Environment) (bool, error) { } if t.Auto == nil || *t.Auto { + var kamelets []string if t.List == "" { - var kamelets []string sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources) if err != nil { return false, err @@ -108,34 +110,25 @@ func (t *kameletsTrait) Configure(e *Environment) (bool, error) { return true }) sort.Strings(kamelets) - t.List = strings.Join(kamelets, ",") - } - if t.ErrorHandler == "" { - t.ErrorHandler = maybeKameletAsDefaultErrorHandler(e.Integration.Configurations()) } - } - - return t.declareKamelets(), nil -} - -func maybeKameletAsDefaultErrorHandler(properties []v1.ConfigurationSpec) string { - for _, property := range properties { - if strings.HasPrefix(property.Value, "camel.k.default-error-handler=") { - split := strings.Split(property.Value, "=") - if len(split) > 0 { - if strings.HasPrefix(split[1], "kamelet:") { - return extractKamelet(split[1]) + if t.ErrorHandlerType == "" { + t.ErrorHandlerType = maybeErrorHandler(e.Integration.Configurations()) + if t.ErrorHandlerType != "" && v1alpha1.ErrorHandlerType(t.ErrorHandlerType) == v1alpha1.ErrorHandlerTypeDeadLetterChannel { + t.ErrorHandlerURI = maybeKameletAsDefaultErrorHandler(e.Integration.Configurations()) + if strings.HasPrefix(t.ErrorHandlerURI, "kamelet:") { + kamelets = append(kamelets, extractKamelet(t.ErrorHandlerURI)) } - return split[1] } } + + t.List = strings.Join(kamelets, ",") } - return "" + return t.declaredKamelets(), nil } -func (t *kameletsTrait) declareKamelets() bool { - return len(t.getKameletKeys()) > 0 || t.ErrorHandler != "" +func (t *kameletsTrait) declaredKamelets() bool { + return len(t.getKameletKeys()) > 0 } func (t *kameletsTrait) Apply(e *Environment) error { @@ -156,7 +149,7 @@ func (t *kameletsTrait) Apply(e *Environment) error { } func (t *kameletsTrait) addKamelets(e *Environment) error { - if t.declareKamelets() { + if t.declaredKamelets() { repo, err := repository.NewForPlatform(e.C, e.Client, e.Platform, e.Integration.Namespace, platform.GetOperatorNamespace()) if err != nil { return err @@ -169,13 +162,6 @@ func (t *kameletsTrait) addKamelets(e *Environment) error { return err } } - if t.ErrorHandler != "" { - // Possible error handler - err = initializeKamelet(repo, e, t.ErrorHandler, t, v1.SourceTypeErrorHandler) - if err != nil { - return err - } - } // resort dependencies sort.Strings(e.Integration.Status.Dependencies) diff --git a/pkg/trait/trait_register.go b/pkg/trait/trait_register.go index 4c06b37..30e89bd 100644 --- a/pkg/trait/trait_register.go +++ b/pkg/trait/trait_register.go @@ -26,6 +26,7 @@ func init() { AddToTraits(newOpenAPITrait) AddToTraits(newKnativeTrait) AddToTraits(newKameletsTrait) + AddToTraits(newErrorHandlerTrait) AddToTraits(newDependenciesTrait) AddToTraits(newBuilderTrait) AddToTraits(newQuarkusTrait) diff --git a/pkg/trait/trait_test.go b/pkg/trait/trait_test.go index f8f75af..b8fdca8 100644 --- a/pkg/trait/trait_test.go +++ b/pkg/trait/trait_test.go @@ -507,7 +507,7 @@ func TestOnlySomeTraitsInfluenceBuild(t *testing.T) { func TestOnlySomeTraitsArePlatform(t *testing.T) { c := NewTraitTestCatalog() - platformTraits := []string{"builder", "camel", "jvm", "container", "dependencies", "deployer", "deployment", "environment", "kamelets", "openapi", "owner", "platform", "quarkus"} + platformTraits := []string{"builder", "camel", "jvm", "container", "dependencies", "deployer", "deployment", "environment", "errorHandler", "kamelets", "openapi", "owner", "platform", "quarkus"} for _, trait := range c.allTraits() { if trait.IsPlatformTrait() {
