This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 5505bec203dbb40b6bbaadb3b2db226a86cf5074 Author: nicolaferraro <[email protected]> AuthorDate: Mon Mar 29 10:28:34 2021 +0200 Fix #2083: add kamel bind command --- pkg/cmd/bind.go | 307 +++++++++++++++++++++++++++++++++++++++++++++++ pkg/cmd/root.go | 3 +- pkg/cmd/run.go | 2 +- pkg/util/uri/uri.go | 7 ++ pkg/util/uri/uri_test.go | 37 ++++++ 5 files changed, 354 insertions(+), 2 deletions(-) diff --git a/pkg/cmd/bind.go b/pkg/cmd/bind.go new file mode 100644 index 0000000..320ffb4 --- /dev/null +++ b/pkg/cmd/bind.go @@ -0,0 +1,307 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "encoding/json" + "errors" + "fmt" + "strings" + + v1 "github.com/apache/camel-k/pkg/apis/camel/v1" + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" + "github.com/apache/camel-k/pkg/util/kubernetes" + "github.com/apache/camel-k/pkg/util/reference" + "github.com/apache/camel-k/pkg/util/uri" + "github.com/spf13/cobra" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// newCmdBind -- +func newCmdBind(rootCmdOptions *RootCmdOptions) (*cobra.Command, *bindCmdOptions) { + options := bindCmdOptions{ + RootCmdOptions: rootCmdOptions, + } + cmd := cobra.Command{ + Use: "bind [source] [sink] ...", + Short: "Bind Kubernetes resources, such as Kamelets, in an integration flow. Endpoints are expected in the format \"[[apigroup/]version:]kind:[namespace/]name\" or plain Camel URIs.", + PreRunE: decode(&options), + RunE: func(cmd *cobra.Command, args []string) error { + if err := options.validate(cmd, args); err != nil { + return err + } + if err := options.run(args); err != nil { + fmt.Println(err.Error()) + } + + return nil + }, + } + + cmd.Flags().String("name", "", "Name for the binding") + cmd.Flags().StringP("output", "o", "", "Output format. One of: json|yaml") + cmd.Flags().StringArrayP("property", "p", nil, `Add a binding property in the form of "source.<key>=<value>" or "sink.<key>=<value>"`) + cmd.Flags().Bool("skip-checks", false, "Do not verify the binding for compliance with Kamelets and other Kubernetes resources") + + return &cmd, &options +} + +const ( + sourceKey = "source" + sinkKey = "sink" +) + +type bindCmdOptions struct { + *RootCmdOptions + Name string `mapstructure:"name" yaml:",omitempty"` + OutputFormat string `mapstructure:"output" yaml:",omitempty"` + Properties []string `mapstructure:"properties" yaml:",omitempty"` + SkipChecks bool `mapstructure:"skip-checks" yaml:",omitempty"` +} + +func (o *bindCmdOptions) validate(cmd *cobra.Command, args []string) error { + if len(args) > 2 { + return errors.New("too many arguments: expected source and sink") + } else if len(args) < 2 { + return errors.New("source or sink arguments are missing") + } + + for _, p := range o.Properties { + if _, _, _, err := o.parseProperty(p); err != nil { + return err + } + } + + if !o.SkipChecks { + source, err := o.decode(args[0], sourceKey) + if err != nil { + return err + } + if err := o.checkCompliance(cmd, source); err != nil { + return err + } + + sink, err := o.decode(args[1], sinkKey) + if err != nil { + return err + } + if err := o.checkCompliance(cmd, sink); err != nil { + return err + } + } + + return nil +} + +func (o *bindCmdOptions) run(args []string) error { + source, err := o.decode(args[0], sourceKey) + if err != nil { + return err + } + + sink, err := o.decode(args[1], sinkKey) + if err != nil { + return err + } + + name := o.nameFor(source, sink) + + binding := v1alpha1.KameletBinding{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: o.Namespace, + Name: name, + }, + Spec: v1alpha1.KameletBindingSpec{ + Source: source, + Sink: sink, + }, + } + + switch o.OutputFormat { + case "": + // continue.. + case "yaml": + data, err := kubernetes.ToYAML(&binding) + if err != nil { + return err + } + fmt.Print(string(data)) + return nil + + case "json": + data, err := kubernetes.ToJSON(&binding) + if err != nil { + return err + } + fmt.Print(string(data)) + return nil + + default: + return fmt.Errorf("invalid output format option '%s', should be one of: yaml|json", o.OutputFormat) + } + + client, err := o.GetCmdClient() + if err != nil { + return err + } + + existed := false + err = client.Create(o.Context, &binding) + if err != nil && k8serrors.IsAlreadyExists(err) { + existed = true + err = kubernetes.ReplaceResource(o.Context, client, &binding) + } + if err != nil { + return err + } + + if !existed { + fmt.Printf("kamelet binding \"%s\" created\n", name) + } else { + fmt.Printf("kamelet binding \"%s\" updated\n", name) + } + return nil +} + +func (o *bindCmdOptions) decode(res string, key string) (v1alpha1.Endpoint, error) { + refConverter := reference.NewConverter(reference.KameletPrefix) + endpoint := v1alpha1.Endpoint{} + props, err := o.asEndpointProperties(o.getProperties(key)) + if err != nil { + return endpoint, err + } + endpoint.Properties = props + + ref, err := refConverter.FromString(res) + if err != nil { + if uri.HasCamelURIFormat(res) { + endpoint.URI = &res + return endpoint, nil + } + return endpoint, err + } + endpoint.Ref = &ref + if endpoint.Ref.Namespace == "" { + endpoint.Ref.Namespace = o.Namespace + } + return endpoint, nil +} + +func (o *bindCmdOptions) nameFor(source, sink v1alpha1.Endpoint) string { + if o.Name != "" { + return o.Name + } + sourcePart := o.nameForEndpoint(source) + sinkPart := o.nameForEndpoint(sink) + name := fmt.Sprintf("%s-to-%s", sourcePart, sinkPart) + return kubernetes.SanitizeName(name) +} + +func (o *bindCmdOptions) nameForEndpoint(endpoint v1alpha1.Endpoint) string { + if endpoint.URI != nil { + return uri.GetComponent(*endpoint.URI) + } + if endpoint.Ref != nil { + return endpoint.Ref.Name + } + return "" +} + +func (o *bindCmdOptions) asEndpointProperties(props map[string]string) (*v1alpha1.EndpointProperties, error) { + if len(props) == 0 { + return nil, nil + } + data, err := json.Marshal(props) + if err != nil { + return nil, err + } + return &v1alpha1.EndpointProperties{ + RawMessage: v1.RawMessage(data), + }, nil +} + +func (o *bindCmdOptions) getProperties(refType string) map[string]string { + props := make(map[string]string) + for _, p := range o.Properties { + tp, k, v, err := o.parseProperty(p) + if err != nil { + continue + } + if tp == refType { + props[k] = v + } + } + return props +} + +func (o *bindCmdOptions) parseProperty(prop string) (string, string, string, error) { + parts := strings.SplitN(prop, "=", 2) + if len(parts) != 2 { + return "", "", "", fmt.Errorf(`property %q does not follow format "[source|sink].<key>=<value>"`, prop) + } + keyParts := strings.SplitN(parts[0], ".", 2) + if len(keyParts) != 2 { + return "", "", "", fmt.Errorf(`property key %q does not follow format "[source|sink].<key>"`, parts[0]) + } + if keyParts[0] != sourceKey && keyParts[0] != sinkKey { + return "", "", "", fmt.Errorf(`property key %q does not start with "source." or "sink."`, parts[0]) + } + return keyParts[0], keyParts[1], parts[1], nil +} + +func (o *bindCmdOptions) checkCompliance(cmd *cobra.Command, endpoint v1alpha1.Endpoint) error { + if endpoint.Ref != nil && endpoint.Ref.Kind == "Kamelet" { + c, err := o.GetCmdClient() + if err != nil { + return err + } + key := client.ObjectKey{ + Namespace: endpoint.Ref.Namespace, + Name: endpoint.Ref.Name, + } + kamelet := v1alpha1.Kamelet{} + if err := c.Get(o.Context, key, &kamelet); err != nil { + if k8serrors.IsNotFound(err) { + // Kamelet may be in the operator namespace, but we currently don't have a way to determine it: we just warn + fmt.Fprintf(cmd.OutOrStderr(), "Warning: Kamelet %q not found in namespace %q\n", key.Name, key.Namespace) + return nil + } + return err + } + if kamelet.Spec.Definition != nil && len(kamelet.Spec.Definition.Required) > 0 { + pMap, err := endpoint.Properties.GetPropertyMap() + if err != nil { + return err + } + for _, reqProp := range kamelet.Spec.Definition.Required { + found := false + if endpoint.Properties != nil { + if _, contains := pMap[reqProp]; contains { + found = true + } + } + if !found { + return fmt.Errorf("binding is missing required property %q for Kamelet %q", reqProp, key.Name) + } + } + } + } + return nil +} diff --git a/pkg/cmd/root.go b/pkg/cmd/root.go index c61a2fa..d3de5c7 100644 --- a/pkg/cmd/root.go +++ b/pkg/cmd/root.go @@ -147,6 +147,7 @@ func addKamelSubcommands(cmd *cobra.Command, options *RootCmdOptions) { cmd.AddCommand(cmdOnly(newCmdDebug(options))) cmd.AddCommand(cmdOnly(newCmdDump(options))) cmd.AddCommand(newCmdLocal(options)) + cmd.AddCommand(cmdOnly(newCmdBind(options))) } func addHelpSubCommands(cmd *cobra.Command, options *RootCmdOptions) error { @@ -206,7 +207,7 @@ func checkAndShowCompatibilityWarning(ctx context.Context, c client.Client, name if k8serrors.IsNotFound(err) { fmt.Printf("No IntegrationPlatform resource in %s namespace\n", namespace) } else { - fmt.Printf("Unable to retrieve the operator version: %s", err.Error()) + fmt.Printf("Unable to retrieve the operator version: %s\n", err.Error()) } } else { if operatorVersion != "" && !compatibleVersions(operatorVersion, defaults.Version) { diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 7caea00..0b6bf6e 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -414,7 +414,7 @@ func (o *runCmdOptions) syncIntegration(cmd *cobra.Command, c client.Client, sou } }() } else { - fmt.Printf("WARNING: the following URL will not be watched for changes: %s\n", s) + fmt.Printf("Warning: the following URL will not be watched for changes: %s\n", s) } } diff --git a/pkg/util/uri/uri.go b/pkg/util/uri/uri.go index 1e0cf1e..542eee4 100644 --- a/pkg/util/uri/uri.go +++ b/pkg/util/uri/uri.go @@ -27,8 +27,15 @@ import ( "github.com/apache/camel-k/pkg/util/log" ) +var uriRegexp = regexp.MustCompile(`^[a-z0-9+][a-zA-Z0-9-+]*:.*$`) + var queryExtractorRegexp = `^[^?]+\?(?:|.*[&])%s=([^&]+)(?:[&].*|$)` +// HasCamelURIFormat tells if a given string may belong to a Camel URI, without checking any catalog +func HasCamelURIFormat(uri string) bool { + return uriRegexp.MatchString(uri) +} + // GetComponent returns the Camel component used in the URI func GetComponent(uri string) string { parts := strings.Split(uri, ":") diff --git a/pkg/util/uri/uri_test.go b/pkg/util/uri/uri_test.go index ae22ff8..19dcdca 100644 --- a/pkg/util/uri/uri_test.go +++ b/pkg/util/uri/uri_test.go @@ -142,3 +142,40 @@ func TestAppendParameters(t *testing.T) { }) } } + +func TestCamelURIFormat(t *testing.T) { + tests := []struct { + uri string + invalid bool + }{ + { + uri: "knative:channnel", + }, + { + uri: "atomix-value:", + }, + { + uri: "aws-ec2:", + }, + { + uri: "coap+tcp:", + }, + { + uri: "solrCloud:", + }, + { + uri: "PostgreSQL:db", + invalid: true, + }, + { + uri: "postgres.org/v1alpha1:PostgreSQL:db", + invalid: true, + }, + } + + for i, tc := range tests { + t.Run(fmt.Sprintf("%d-%s", i, tc.uri), func(t *testing.T) { + assert.Equal(t, !tc.invalid, HasCamelURIFormat(tc.uri)) + }) + } +}
