This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 5505bec203dbb40b6bbaadb3b2db226a86cf5074
Author: nicolaferraro <[email protected]>
AuthorDate: Mon Mar 29 10:28:34 2021 +0200

    Fix #2083: add kamel bind command
---
 pkg/cmd/bind.go          | 307 +++++++++++++++++++++++++++++++++++++++++++++++
 pkg/cmd/root.go          |   3 +-
 pkg/cmd/run.go           |   2 +-
 pkg/util/uri/uri.go      |   7 ++
 pkg/util/uri/uri_test.go |  37 ++++++
 5 files changed, 354 insertions(+), 2 deletions(-)

diff --git a/pkg/cmd/bind.go b/pkg/cmd/bind.go
new file mode 100644
index 0000000..320ffb4
--- /dev/null
+++ b/pkg/cmd/bind.go
@@ -0,0 +1,307 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cmd
+
+import (
+       "encoding/json"
+       "errors"
+       "fmt"
+       "strings"
+
+       v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
+       "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
+       "github.com/apache/camel-k/pkg/util/kubernetes"
+       "github.com/apache/camel-k/pkg/util/reference"
+       "github.com/apache/camel-k/pkg/util/uri"
+       "github.com/spf13/cobra"
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "sigs.k8s.io/controller-runtime/pkg/client"
+)
+
+// newCmdBind --
+func newCmdBind(rootCmdOptions *RootCmdOptions) (*cobra.Command, 
*bindCmdOptions) {
+       options := bindCmdOptions{
+               RootCmdOptions: rootCmdOptions,
+       }
+       cmd := cobra.Command{
+               Use:     "bind [source] [sink] ...",
+               Short:   "Bind Kubernetes resources, such as Kamelets, in an 
integration flow. Endpoints are expected in the format 
\"[[apigroup/]version:]kind:[namespace/]name\" or plain Camel URIs.",
+               PreRunE: decode(&options),
+               RunE: func(cmd *cobra.Command, args []string) error {
+                       if err := options.validate(cmd, args); err != nil {
+                               return err
+                       }
+                       if err := options.run(args); err != nil {
+                               fmt.Println(err.Error())
+                       }
+
+                       return nil
+               },
+       }
+
+       cmd.Flags().String("name", "", "Name for the binding")
+       cmd.Flags().StringP("output", "o", "", "Output format. One of: 
json|yaml")
+       cmd.Flags().StringArrayP("property", "p", nil, `Add a binding property 
in the form of "source.<key>=<value>" or "sink.<key>=<value>"`)
+       cmd.Flags().Bool("skip-checks", false, "Do not verify the binding for 
compliance with Kamelets and other Kubernetes resources")
+
+       return &cmd, &options
+}
+
+const (
+       sourceKey = "source"
+       sinkKey   = "sink"
+)
+
+type bindCmdOptions struct {
+       *RootCmdOptions
+       Name         string   `mapstructure:"name" yaml:",omitempty"`
+       OutputFormat string   `mapstructure:"output" yaml:",omitempty"`
+       Properties   []string `mapstructure:"properties" yaml:",omitempty"`
+       SkipChecks   bool     `mapstructure:"skip-checks" yaml:",omitempty"`
+}
+
+func (o *bindCmdOptions) validate(cmd *cobra.Command, args []string) error {
+       if len(args) > 2 {
+               return errors.New("too many arguments: expected source and 
sink")
+       } else if len(args) < 2 {
+               return errors.New("source or sink arguments are missing")
+       }
+
+       for _, p := range o.Properties {
+               if _, _, _, err := o.parseProperty(p); err != nil {
+                       return err
+               }
+       }
+
+       if !o.SkipChecks {
+               source, err := o.decode(args[0], sourceKey)
+               if err != nil {
+                       return err
+               }
+               if err := o.checkCompliance(cmd, source); err != nil {
+                       return err
+               }
+
+               sink, err := o.decode(args[1], sinkKey)
+               if err != nil {
+                       return err
+               }
+               if err := o.checkCompliance(cmd, sink); err != nil {
+                       return err
+               }
+       }
+
+       return nil
+}
+
+func (o *bindCmdOptions) run(args []string) error {
+       source, err := o.decode(args[0], sourceKey)
+       if err != nil {
+               return err
+       }
+
+       sink, err := o.decode(args[1], sinkKey)
+       if err != nil {
+               return err
+       }
+
+       name := o.nameFor(source, sink)
+
+       binding := v1alpha1.KameletBinding{
+               ObjectMeta: metav1.ObjectMeta{
+                       Namespace: o.Namespace,
+                       Name:      name,
+               },
+               Spec: v1alpha1.KameletBindingSpec{
+                       Source: source,
+                       Sink:   sink,
+               },
+       }
+
+       switch o.OutputFormat {
+       case "":
+               // continue..
+       case "yaml":
+               data, err := kubernetes.ToYAML(&binding)
+               if err != nil {
+                       return err
+               }
+               fmt.Print(string(data))
+               return nil
+
+       case "json":
+               data, err := kubernetes.ToJSON(&binding)
+               if err != nil {
+                       return err
+               }
+               fmt.Print(string(data))
+               return nil
+
+       default:
+               return fmt.Errorf("invalid output format option '%s', should be 
one of: yaml|json", o.OutputFormat)
+       }
+
+       client, err := o.GetCmdClient()
+       if err != nil {
+               return err
+       }
+
+       existed := false
+       err = client.Create(o.Context, &binding)
+       if err != nil && k8serrors.IsAlreadyExists(err) {
+               existed = true
+               err = kubernetes.ReplaceResource(o.Context, client, &binding)
+       }
+       if err != nil {
+               return err
+       }
+
+       if !existed {
+               fmt.Printf("kamelet binding \"%s\" created\n", name)
+       } else {
+               fmt.Printf("kamelet binding \"%s\" updated\n", name)
+       }
+       return nil
+}
+
+func (o *bindCmdOptions) decode(res string, key string) (v1alpha1.Endpoint, 
error) {
+       refConverter := reference.NewConverter(reference.KameletPrefix)
+       endpoint := v1alpha1.Endpoint{}
+       props, err := o.asEndpointProperties(o.getProperties(key))
+       if err != nil {
+               return endpoint, err
+       }
+       endpoint.Properties = props
+
+       ref, err := refConverter.FromString(res)
+       if err != nil {
+               if uri.HasCamelURIFormat(res) {
+                       endpoint.URI = &res
+                       return endpoint, nil
+               }
+               return endpoint, err
+       }
+       endpoint.Ref = &ref
+       if endpoint.Ref.Namespace == "" {
+               endpoint.Ref.Namespace = o.Namespace
+       }
+       return endpoint, nil
+}
+
+func (o *bindCmdOptions) nameFor(source, sink v1alpha1.Endpoint) string {
+       if o.Name != "" {
+               return o.Name
+       }
+       sourcePart := o.nameForEndpoint(source)
+       sinkPart := o.nameForEndpoint(sink)
+       name := fmt.Sprintf("%s-to-%s", sourcePart, sinkPart)
+       return kubernetes.SanitizeName(name)
+}
+
+func (o *bindCmdOptions) nameForEndpoint(endpoint v1alpha1.Endpoint) string {
+       if endpoint.URI != nil {
+               return uri.GetComponent(*endpoint.URI)
+       }
+       if endpoint.Ref != nil {
+               return endpoint.Ref.Name
+       }
+       return ""
+}
+
+func (o *bindCmdOptions) asEndpointProperties(props map[string]string) 
(*v1alpha1.EndpointProperties, error) {
+       if len(props) == 0 {
+               return nil, nil
+       }
+       data, err := json.Marshal(props)
+       if err != nil {
+               return nil, err
+       }
+       return &v1alpha1.EndpointProperties{
+               RawMessage: v1.RawMessage(data),
+       }, nil
+}
+
+func (o *bindCmdOptions) getProperties(refType string) map[string]string {
+       props := make(map[string]string)
+       for _, p := range o.Properties {
+               tp, k, v, err := o.parseProperty(p)
+               if err != nil {
+                       continue
+               }
+               if tp == refType {
+                       props[k] = v
+               }
+       }
+       return props
+}
+
+func (o *bindCmdOptions) parseProperty(prop string) (string, string, string, 
error) {
+       parts := strings.SplitN(prop, "=", 2)
+       if len(parts) != 2 {
+               return "", "", "", fmt.Errorf(`property %q does not follow 
format "[source|sink].<key>=<value>"`, prop)
+       }
+       keyParts := strings.SplitN(parts[0], ".", 2)
+       if len(keyParts) != 2 {
+               return "", "", "", fmt.Errorf(`property key %q does not follow 
format "[source|sink].<key>"`, parts[0])
+       }
+       if keyParts[0] != sourceKey && keyParts[0] != sinkKey {
+               return "", "", "", fmt.Errorf(`property key %q does not start 
with "source." or "sink."`, parts[0])
+       }
+       return keyParts[0], keyParts[1], parts[1], nil
+}
+
+func (o *bindCmdOptions) checkCompliance(cmd *cobra.Command, endpoint 
v1alpha1.Endpoint) error {
+       if endpoint.Ref != nil && endpoint.Ref.Kind == "Kamelet" {
+               c, err := o.GetCmdClient()
+               if err != nil {
+                       return err
+               }
+               key := client.ObjectKey{
+                       Namespace: endpoint.Ref.Namespace,
+                       Name:      endpoint.Ref.Name,
+               }
+               kamelet := v1alpha1.Kamelet{}
+               if err := c.Get(o.Context, key, &kamelet); err != nil {
+                       if k8serrors.IsNotFound(err) {
+                               // Kamelet may be in the operator namespace, 
but we currently don't have a way to determine it: we just warn
+                               fmt.Fprintf(cmd.OutOrStderr(), "Warning: 
Kamelet %q not found in namespace %q\n", key.Name, key.Namespace)
+                               return nil
+                       }
+                       return err
+               }
+               if kamelet.Spec.Definition != nil && 
len(kamelet.Spec.Definition.Required) > 0 {
+                       pMap, err := endpoint.Properties.GetPropertyMap()
+                       if err != nil {
+                               return err
+                       }
+                       for _, reqProp := range 
kamelet.Spec.Definition.Required {
+                               found := false
+                               if endpoint.Properties != nil {
+                                       if _, contains := pMap[reqProp]; 
contains {
+                                               found = true
+                                       }
+                               }
+                               if !found {
+                                       return fmt.Errorf("binding is missing 
required property %q for Kamelet %q", reqProp, key.Name)
+                               }
+                       }
+               }
+       }
+       return nil
+}
diff --git a/pkg/cmd/root.go b/pkg/cmd/root.go
index c61a2fa..d3de5c7 100644
--- a/pkg/cmd/root.go
+++ b/pkg/cmd/root.go
@@ -147,6 +147,7 @@ func addKamelSubcommands(cmd *cobra.Command, options 
*RootCmdOptions) {
        cmd.AddCommand(cmdOnly(newCmdDebug(options)))
        cmd.AddCommand(cmdOnly(newCmdDump(options)))
        cmd.AddCommand(newCmdLocal(options))
+       cmd.AddCommand(cmdOnly(newCmdBind(options)))
 }
 
 func addHelpSubCommands(cmd *cobra.Command, options *RootCmdOptions) error {
@@ -206,7 +207,7 @@ func checkAndShowCompatibilityWarning(ctx context.Context, 
c client.Client, name
                if k8serrors.IsNotFound(err) {
                        fmt.Printf("No IntegrationPlatform resource in %s 
namespace\n", namespace)
                } else {
-                       fmt.Printf("Unable to retrieve the operator version: 
%s", err.Error())
+                       fmt.Printf("Unable to retrieve the operator version: 
%s\n", err.Error())
                }
        } else {
                if operatorVersion != "" && 
!compatibleVersions(operatorVersion, defaults.Version) {
diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go
index 7caea00..0b6bf6e 100644
--- a/pkg/cmd/run.go
+++ b/pkg/cmd/run.go
@@ -414,7 +414,7 @@ func (o *runCmdOptions) syncIntegration(cmd *cobra.Command, 
c client.Client, sou
                                }
                        }()
                } else {
-                       fmt.Printf("WARNING: the following URL will not be 
watched for changes: %s\n", s)
+                       fmt.Printf("Warning: the following URL will not be 
watched for changes: %s\n", s)
                }
        }
 
diff --git a/pkg/util/uri/uri.go b/pkg/util/uri/uri.go
index 1e0cf1e..542eee4 100644
--- a/pkg/util/uri/uri.go
+++ b/pkg/util/uri/uri.go
@@ -27,8 +27,15 @@ import (
        "github.com/apache/camel-k/pkg/util/log"
 )
 
+var uriRegexp = regexp.MustCompile(`^[a-z0-9+][a-zA-Z0-9-+]*:.*$`)
+
 var queryExtractorRegexp = `^[^?]+\?(?:|.*[&])%s=([^&]+)(?:[&].*|$)`
 
+// HasCamelURIFormat tells if a given string may belong to a Camel URI, 
without checking any catalog
+func HasCamelURIFormat(uri string) bool {
+       return uriRegexp.MatchString(uri)
+}
+
 // GetComponent returns the Camel component used in the URI
 func GetComponent(uri string) string {
        parts := strings.Split(uri, ":")
diff --git a/pkg/util/uri/uri_test.go b/pkg/util/uri/uri_test.go
index ae22ff8..19dcdca 100644
--- a/pkg/util/uri/uri_test.go
+++ b/pkg/util/uri/uri_test.go
@@ -142,3 +142,40 @@ func TestAppendParameters(t *testing.T) {
                })
        }
 }
+
+func TestCamelURIFormat(t *testing.T) {
+       tests := []struct {
+               uri     string
+               invalid bool
+       }{
+               {
+                       uri: "knative:channnel",
+               },
+               {
+                       uri: "atomix-value:",
+               },
+               {
+                       uri: "aws-ec2:",
+               },
+               {
+                       uri: "coap+tcp:",
+               },
+               {
+                       uri: "solrCloud:",
+               },
+               {
+                       uri:     "PostgreSQL:db",
+                       invalid: true,
+               },
+               {
+                       uri:     "postgres.org/v1alpha1:PostgreSQL:db",
+                       invalid: true,
+               },
+       }
+
+       for i, tc := range tests {
+               t.Run(fmt.Sprintf("%d-%s", i, tc.uri), func(t *testing.T) {
+                       assert.Equal(t, !tc.invalid, HasCamelURIFormat(tc.uri))
+               })
+       }
+}

Reply via email to