This is an automated email from the ASF dual-hosted git repository.

nferraro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 0fbbdd93342bcfb6e654723766d3144486961d5a
Author: Nicola Ferraro <[email protected]>
AuthorDate: Fri Jul 10 12:24:42 2020 +0200

    kamelet: compelete e2e example
---
 e2e/knative/knative_platform_test.go               |  4 +-
 examples/kamelets/fake-usage.groovy                | 21 +++++++++
 examples/kamelets/usage.groovy                     | 20 ++++++++
 go.sum                                             |  2 +
 pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go   |  2 +-
 pkg/cmd/run.go                                     |  2 +-
 pkg/controller/kamelet/kamelet_controller.go       |  1 +
 .../flow_test.go => controller/kamelet/monitor.go} | 53 ++++++++++------------
 pkg/metadata/metadata.go                           | 28 ++----------
 pkg/trait/dependencies.go                          |  7 ++-
 pkg/trait/init.go                                  |  2 +-
 pkg/trait/kamelets.go                              | 33 ++++++++++----
 pkg/trait/kamelets_test.go                         | 22 ++++-----
 pkg/trait/knative.go                               | 43 +++++++++++++-----
 pkg/util/digest/digest.go                          |  2 +-
 pkg/util/flow/flow.go                              | 14 +++---
 pkg/util/flow/flow_test.go                         |  4 +-
 pkg/util/kubernetes/resolver.go                    | 19 +++++++-
 18 files changed, 175 insertions(+), 104 deletions(-)

diff --git a/e2e/knative/knative_platform_test.go 
b/e2e/knative/knative_platform_test.go
index 857a72f..c32711d 100644
--- a/e2e/knative/knative_platform_test.go
+++ b/e2e/knative/knative_platform_test.go
@@ -58,10 +58,10 @@ func TestKnativePlatformTest(t *testing.T) {
                        // Change something in the integration to produce a 
redeploy
                        Expect(UpdateIntegration(ns, "yaml", func(it 
*v1.Integration) {
                                it.Spec.Profile = ""
-                               content, err := flow.Marshal(it.Spec.Flows)
+                               content, err := flow.ToYamlDSL(it.Spec.Flows)
                                assert.NoError(t, err)
                                newData := strings.ReplaceAll(string(content), 
"string!", "string!!!")
-                               newFlows, err := flow.UnmarshalString(newData)
+                               newFlows, err := flow.FromYamlDSLString(newData)
                                assert.NoError(t, err)
                                it.Spec.Flows = newFlows
                        })).To(BeNil())
diff --git a/examples/kamelets/fake-usage.groovy 
b/examples/kamelets/fake-usage.groovy
new file mode 100755
index 0000000..bff523b
--- /dev/null
+++ b/examples/kamelets/fake-usage.groovy
@@ -0,0 +1,21 @@
+// camel-k: language=groovy trait=kamelets.list=timer
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// The integration should contain the kamelet as source
+
+// Until the kamelet component is added in runtime
\ No newline at end of file
diff --git a/examples/kamelets/usage.groovy b/examples/kamelets/usage.groovy
new file mode 100755
index 0000000..15090c9
--- /dev/null
+++ b/examples/kamelets/usage.groovy
@@ -0,0 +1,20 @@
+// camel-k: language=groovy
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+from('kamelet:timer')
+    .to('log:info')
diff --git a/go.sum b/go.sum
index 40b5e55..34d32df 100644
--- a/go.sum
+++ b/go.sum
@@ -984,6 +984,8 @@ github.com/opencontainers/runc 
v1.0.0-rc2.0.20190611121236-6cc515888830/go.mod h
 github.com/opencontainers/runtime-spec 
v0.1.2-0.20190507144316-5b71a03e2700/go.mod 
h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
 github.com/opencontainers/runtime-tools 
v0.0.0-20181011054405-1d69bd0f9c39/go.mod 
h1:r3f7wjNzSs2extwzU3Y+6pKfobzPh+kKFJ3ofN+3nfs=
 github.com/openshift/api v0.0.0-20200205133042-34f0ec8dab87/go.mod 
h1:fT6U/JfG8uZzemTRwZA2kBDJP5nWz7v05UHnty/D+pk=
+github.com/openshift/api v0.0.0-20200221181648-8ce0047d664f 
h1:ATPK7UhEwglONJc8qGsq41TbPk0XA4Kpm7XZZ3mlhAY=
+github.com/openshift/api v0.0.0-20200221181648-8ce0047d664f/go.mod 
h1:dh9o4Fs58gpFXGSYfnVxGR9PnV53I8TW84pQaJDdGiY=
 github.com/openshift/api v3.9.1-0.20190927182313-d4a64ec2cbd8+incompatible 
h1:YwFnUQ5RQ17CmkxHyjpQnWAQOGkLKXY0shOUEyqaCGk=
 github.com/openshift/api 
v3.9.1-0.20190927182313-d4a64ec2cbd8+incompatible/go.mod 
h1:dh9o4Fs58gpFXGSYfnVxGR9PnV53I8TW84pQaJDdGiY=
 github.com/openshift/client-go v0.0.0-20190923180330-3b6373338c9b/go.mod 
h1:6rzn+JTr7+WYS2E1TExP4gByoABxMznR6y2SnUIkmxk=
diff --git a/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go 
b/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
index 1292dc3..e6c991c 100644
--- a/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
+++ b/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go
@@ -477,7 +477,7 @@ func (in *KameletSpec) DeepCopyInto(out *KameletSpec) {
        if in.Flow != nil {
                in, out := &in.Flow, &out.Flow
                *out = new(v1.Flow)
-               **out = **in
+               (*in).DeepCopyInto(*out)
        }
        out.Authorization = in.Authorization
        if in.Types != nil {
diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go
index 5c7e4c3..744f635 100644
--- a/pkg/cmd/run.go
+++ b/pkg/cmd/run.go
@@ -498,7 +498,7 @@ func (o *runCmdOptions) updateIntegrationCode(c 
client.Client, sources []string,
                }
 
                if o.UseFlows && (strings.HasSuffix(source, ".yaml") || 
strings.HasSuffix(source, ".yml")) {
-                       flows, err := flow.UnmarshalString(data)
+                       flows, err := flow.FromYamlDSLString(data)
                        if err != nil {
                                return nil, err
                        }
diff --git a/pkg/controller/kamelet/kamelet_controller.go 
b/pkg/controller/kamelet/kamelet_controller.go
index 048cf22..baad186 100644
--- a/pkg/controller/kamelet/kamelet_controller.go
+++ b/pkg/controller/kamelet/kamelet_controller.go
@@ -129,6 +129,7 @@ func (r *ReconcileKamelet) Reconcile(request 
reconcile.Request) (reconcile.Resul
 
        actions := []Action{
                NewInitializeAction(),
+               NewMonitorAction(),
        }
 
        var targetPhase v1alpha1.KameletPhase
diff --git a/pkg/util/flow/flow_test.go b/pkg/controller/kamelet/monitor.go
similarity index 52%
copy from pkg/util/flow/flow_test.go
copy to pkg/controller/kamelet/monitor.go
index c9248cb..49935d2 100644
--- a/pkg/util/flow/flow_test.go
+++ b/pkg/controller/kamelet/monitor.go
@@ -15,39 +15,32 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-package flow
+package kamelet
 
 import (
-       "bytes"
-       "encoding/json"
-       "testing"
+       "context"
 
-       "github.com/stretchr/testify/assert"
+       "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
 )
 
-func TestReadWriteYaml(t *testing.T) {
-       // yaml in conventional form as marshalled by the go runtime
-       yaml := `- from:
-    steps:
-    - to: log:info
-    uri: timer:tick
-`
-
-       yamlReader := bytes.NewReader([]byte(yaml))
-       flows, err := Unmarshal(yamlReader)
-       assert.NoError(t, err)
-       assert.NotNil(t, flows)
-       assert.Len(t, flows, 1)
-
-       flow := map[string]interface{}{}
-       err = json.Unmarshal(flows[0].RawMessage, &flow)
-       assert.NoError(t, err)
-
-       assert.NotNil(t, flow["from"])
-       assert.Nil(t, flow["xx"])
-
-       data, err := Marshal(flows)
-       assert.NoError(t, err)
-       assert.NotNil(t, data)
-       assert.Equal(t, yaml, string(data))
+// NewMonitorAction returns an action that monitors the kamelet after it's 
fully initialized
+func NewMonitorAction() Action {
+       return &monitorAction{}
+}
+
+type monitorAction struct {
+       baseAction
+}
+
+func (action *monitorAction) Name() string {
+       return "monitor"
+}
+
+func (action *monitorAction) CanHandle(kamelet *v1alpha1.Kamelet) bool {
+       return kamelet.Status.Phase == v1alpha1.KameletPhaseReady
+}
+
+func (action *monitorAction) Handle(ctx context.Context, kamelet 
*v1alpha1.Kamelet) (*v1alpha1.Kamelet, error) {
+       // Doing nothing for now
+       return kamelet, nil
 }
diff --git a/pkg/metadata/metadata.go b/pkg/metadata/metadata.go
index 27a2c8d..03c8ca1 100644
--- a/pkg/metadata/metadata.go
+++ b/pkg/metadata/metadata.go
@@ -22,9 +22,6 @@ import (
 
        v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
        "github.com/apache/camel-k/pkg/util/camel"
-       "github.com/apache/camel-k/pkg/util/gzip"
-       "github.com/apache/camel-k/pkg/util/log"
-
        src "github.com/apache/camel-k/pkg/util/source"
 )
 
@@ -64,10 +61,11 @@ func merge(m1 src.Metadata, m2 src.Metadata) src.Metadata {
 
 // Extract returns metadata information from the source code
 func Extract(catalog *camel.RuntimeCatalog, source v1.SourceSpec) 
IntegrationMetadata {
-       var err error
-       source, err = uncompress(source)
-       if err != nil {
-               log.Errorf(err, "unable to uncompress source %s: %v", 
source.Name, err)
+       if source.ContentRef != "" {
+               panic("source must be dereferenced before calling this method")
+       }
+       if source.Compression {
+               panic("source must be uncompressed before calling this method")
        }
 
        language := source.InferLanguage()
@@ -94,19 +92,3 @@ func Each(catalog *camel.RuntimeCatalog, sources 
[]v1.SourceSpec, consumer func(
                }
        }
 }
-
-func uncompress(spec v1.SourceSpec) (v1.SourceSpec, error) {
-       if spec.Compression {
-               data := []byte(spec.Content)
-               var uncompressed []byte
-               var err error
-               if uncompressed, err = gzip.UncompressBase64(data); err != nil {
-                       return spec, err
-               }
-               newSpec := spec
-               newSpec.Compression = false
-               newSpec.Content = string(uncompressed)
-               return newSpec, nil
-       }
-       return spec, nil
-}
diff --git a/pkg/trait/dependencies.go b/pkg/trait/dependencies.go
index 8a13087..4a704b6 100644
--- a/pkg/trait/dependencies.go
+++ b/pkg/trait/dependencies.go
@@ -19,6 +19,7 @@ package trait
 
 import (
        "fmt"
+       "github.com/apache/camel-k/pkg/util/kubernetes"
 
        "github.com/apache/camel-k/pkg/metadata"
 
@@ -66,7 +67,11 @@ func (t *dependenciesTrait) Apply(e *Environment) error {
                dependencies.Add(fmt.Sprintf("mvn:%s/%s", d.GroupID, 
d.ArtifactID))
        }
 
-       for _, s := range e.Integration.Sources() {
+       sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, 
e.Integration, e.Resources)
+       if err != nil {
+               return err
+       }
+       for _, s := range sources {
                meta := metadata.Extract(e.CamelCatalog, s)
                lang := s.InferLanguage()
 
diff --git a/pkg/trait/init.go b/pkg/trait/init.go
index 32236e9..9685832 100644
--- a/pkg/trait/init.go
+++ b/pkg/trait/init.go
@@ -54,7 +54,7 @@ func (t *initTrait) Apply(e *Environment) error {
 
                // Flows need to be turned into a generated source
                if len(e.Integration.Spec.Flows) > 0 {
-                       content, err := flow.Marshal(e.Integration.Spec.Flows)
+                       content, err := flow.ToYamlDSL(e.Integration.Spec.Flows)
                        if err != nil {
                                return err
                        }
diff --git a/pkg/trait/kamelets.go b/pkg/trait/kamelets.go
index 80d4bc2..994ddcb 100644
--- a/pkg/trait/kamelets.go
+++ b/pkg/trait/kamelets.go
@@ -20,6 +20,8 @@ package trait
 import (
        "encoding/json"
        "fmt"
+       "github.com/apache/camel-k/pkg/util/flow"
+       "github.com/apache/camel-k/pkg/util/kubernetes"
        "regexp"
        "sort"
        "strconv"
@@ -83,10 +85,18 @@ func (t *kameletsTrait) Configure(e *Environment) (bool, 
error) {
                return false, nil
        }
 
+       if !e.IntegrationInPhase(v1.IntegrationPhaseInitialization) {
+               return false, nil
+       }
+
        if t.Auto == nil || *t.Auto {
                if t.List == "" {
                        var kamelets []string
-                       metadata.Each(e.CamelCatalog, e.Integration.Sources(), 
func(_ int, meta metadata.IntegrationMetadata) bool {
+                       sources, err := 
kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+                       if err != nil {
+                               return false, err
+                       }
+                       metadata.Each(e.CamelCatalog, sources, func(_ int, meta 
metadata.IntegrationMetadata) bool {
                                util.StringSliceUniqueConcat(&kamelets, 
extractKamelets(meta.FromURIs))
                                util.StringSliceUniqueConcat(&kamelets, 
extractKamelets(meta.ToURIs))
                                return true
@@ -121,6 +131,10 @@ func (t *kameletsTrait) addKamelets(e *Environment) error {
                        return err
                }
 
+               if kamelet.Status.Phase != v1alpha1.KameletPhaseReady {
+                       return fmt.Errorf("kamelet %q is not %s: %s", k, 
v1alpha1.KameletPhaseReady, kamelet.Status.Phase)
+               }
+
                if err := t.addKameletAsSource(e, kamelet); err != nil {
                        return err
                }
@@ -137,21 +151,20 @@ func (t *kameletsTrait) addKameletAsSource(e 
*Environment, kamelet v1alpha1.Kame
        var sources []v1.SourceSpec
 
        if kamelet.Spec.Flow != nil {
-               // TODO fixme removed for changes to Flow
-               //flowData, err := flows.Marshal([]v1.Flow{*kamelet.Spec.Flow})
-               //if err != nil {
-               //      return err
-               //}
+
+               flowData, err := flow.ToYamlDSL([]v1.Flow{*kamelet.Spec.Flow})
+               if err != nil {
+                       return err
+               }
                flowSource := v1.SourceSpec{
                        DataSpec: v1.DataSpec{
-                               Name: fmt.Sprintf("%s.yaml", kamelet.Name),
-                               //Content: string(flowData),
-                               Content: string(*kamelet.Spec.Flow),
+                               Name:    fmt.Sprintf("%s.yaml", kamelet.Name),
+                               Content: string(flowData),
                        },
                        Language: v1.LanguageYaml,
                        Type:     v1.SourceTypeKamelet,
                }
-               flowSource, err := integrationSourceFromKameletSource(e, 
kamelet, flowSource, fmt.Sprintf("%s-kamelet-%s-flow", e.Integration.Name, 
kamelet.Name))
+               flowSource, err = integrationSourceFromKameletSource(e, 
kamelet, flowSource, fmt.Sprintf("%s-kamelet-%s-flow", e.Integration.Name, 
kamelet.Name))
                if err != nil {
                        return err
                }
diff --git a/pkg/trait/kamelets_test.go b/pkg/trait/kamelets_test.go
index 59657be..d2a1321 100644
--- a/pkg/trait/kamelets_test.go
+++ b/pkg/trait/kamelets_test.go
@@ -19,7 +19,7 @@ package trait
 
 import (
        "context"
-       "gopkg.in/yaml.v2"
+       "encoding/json"
        "testing"
 
        v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
@@ -88,7 +88,7 @@ func TestKameletLookup(t *testing.T) {
                        Name:      "timer",
                },
                Spec: v1alpha1.KameletSpec{
-                       Flow: deleteMeAtSomePoint(map[string]interface{}{
+                       Flow: marshalOrFail(map[string]interface{}{
                                "from": map[string]interface{}{
                                        "uri": "timer:tick",
                                },
@@ -131,7 +131,7 @@ func TestKameletSecondarySourcesLookup(t *testing.T) {
                        Name:      "timer",
                },
                Spec: v1alpha1.KameletSpec{
-                       Flow: deleteMeAtSomePoint(map[string]interface{}{
+                       Flow: marshalOrFail(map[string]interface{}{
                                "from": map[string]interface{}{
                                        "uri": "timer:tick",
                                },
@@ -236,7 +236,7 @@ func TestErrorMultipleKameletSources(t *testing.T) {
                                        Type: v1.SourceTypeKamelet,
                                },
                        },
-                       Flow: deleteMeAtSomePoint(map[string]interface{}{
+                       Flow: marshalOrFail(map[string]interface{}{
                                "from": map[string]interface{}{
                                        "uri": "timer:tick",
                                },
@@ -265,7 +265,7 @@ func TestMultipleKamelets(t *testing.T) {
                        Name:      "timer",
                },
                Spec: v1alpha1.KameletSpec{
-                       Flow: deleteMeAtSomePoint(map[string]interface{}{
+                       Flow: marshalOrFail(map[string]interface{}{
                                "from": map[string]interface{}{
                                        "uri": "timer:tick",
                                },
@@ -290,7 +290,7 @@ func TestMultipleKamelets(t *testing.T) {
                        Name:      "logger",
                },
                Spec: v1alpha1.KameletSpec{
-                       Flow: deleteMeAtSomePoint(map[string]interface{}{
+                       Flow: marshalOrFail(map[string]interface{}{
                                "from": map[string]interface{}{
                                        "uri": "tbd:endpoint",
                                        "steps": []interface{}{
@@ -358,7 +358,7 @@ func TestKameletConfigLookup(t *testing.T) {
                        Name:      "timer",
                },
                Spec: v1alpha1.KameletSpec{
-                       Flow: deleteMeAtSomePoint(map[string]interface{}{
+                       Flow: marshalOrFail(map[string]interface{}{
                                "from": map[string]interface{}{
                                        "uri": "timer:tick",
                                },
@@ -420,7 +420,7 @@ func TestKameletNamedConfigLookup(t *testing.T) {
                        Name:      "timer",
                },
                Spec: v1alpha1.KameletSpec{
-                       Flow: deleteMeAtSomePoint(map[string]interface{}{
+                       Flow: marshalOrFail(map[string]interface{}{
                                "from": map[string]interface{}{
                                        "uri": "timer:tick",
                                },
@@ -511,11 +511,11 @@ func createKameletsTestEnvironment(flow string, objects 
...runtime.Object) (*kam
        return trait, environment
 }
 
-func deleteMeAtSomePoint(flow map[string]interface{}) *v1.Flow {
-       data, err := yaml.Marshal(flow)
+func marshalOrFail(flow map[string]interface{}) *v1.Flow {
+       data, err := json.Marshal(flow)
        if err != nil {
                panic(err)
        }
-       f := v1.Flow(data)
+       f := v1.Flow{data}
        return &f
 }
diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go
index 92103f4..5ff9c46 100644
--- a/pkg/trait/knative.go
+++ b/pkg/trait/knative.go
@@ -23,6 +23,7 @@ import (
        "reflect"
        "strings"
 
+       "github.com/apache/camel-k/pkg/util/kubernetes"
        v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
        knativeapi "github.com/apache/camel-k/pkg/apis/camel/v1/knative"
        "github.com/apache/camel-k/pkg/metadata"
@@ -110,8 +111,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, 
error) {
        if t.Auto == nil || *t.Auto {
                if len(t.ChannelSources) == 0 {
                        items := make([]string, 0)
-
-                       metadata.Each(e.CamelCatalog, e.Integration.Sources(), 
func(_ int, meta metadata.IntegrationMetadata) bool {
+                       sources, err := 
kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+                       if err != nil {
+                               return false, err
+                       }
+                       metadata.Each(e.CamelCatalog, sources, func(_ int, meta 
metadata.IntegrationMetadata) bool {
                                items = append(items, 
knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeChannel)...)
                                return true
                        })
@@ -120,8 +124,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, 
error) {
                }
                if len(t.ChannelSinks) == 0 {
                        items := make([]string, 0)
-
-                       metadata.Each(e.CamelCatalog, e.Integration.Sources(), 
func(_ int, meta metadata.IntegrationMetadata) bool {
+                       sources, err := 
kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+                       if err != nil {
+                               return false, err
+                       }
+                       metadata.Each(e.CamelCatalog, sources, func(_ int, meta 
metadata.IntegrationMetadata) bool {
                                items = append(items, 
knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeChannel)...)
                                return true
                        })
@@ -130,8 +137,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, 
error) {
                }
                if len(t.EndpointSources) == 0 {
                        items := make([]string, 0)
-
-                       metadata.Each(e.CamelCatalog, e.Integration.Sources(), 
func(_ int, meta metadata.IntegrationMetadata) bool {
+                       sources, err := 
kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+                       if err != nil {
+                               return false, err
+                       }
+                       metadata.Each(e.CamelCatalog, sources, func(_ int, meta 
metadata.IntegrationMetadata) bool {
                                items = append(items, 
knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeEndpoint)...)
                                return true
                        })
@@ -140,8 +150,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, 
error) {
                }
                if len(t.EndpointSinks) == 0 {
                        items := make([]string, 0)
-
-                       metadata.Each(e.CamelCatalog, e.Integration.Sources(), 
func(_ int, meta metadata.IntegrationMetadata) bool {
+                       sources, err := 
kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+                       if err != nil {
+                               return false, err
+                       }
+                       metadata.Each(e.CamelCatalog, sources, func(_ int, meta 
metadata.IntegrationMetadata) bool {
                                items = append(items, 
knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeEndpoint)...)
                                return true
                        })
@@ -150,8 +163,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, 
error) {
                }
                if len(t.EventSources) == 0 {
                        items := make([]string, 0)
-
-                       metadata.Each(e.CamelCatalog, e.Integration.Sources(), 
func(_ int, meta metadata.IntegrationMetadata) bool {
+                       sources, err := 
kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+                       if err != nil {
+                               return false, err
+                       }
+                       metadata.Each(e.CamelCatalog, sources, func(_ int, meta 
metadata.IntegrationMetadata) bool {
                                items = append(items, 
knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeEvent)...)
                                return true
                        })
@@ -160,8 +176,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, 
error) {
                }
                if len(t.EventSinks) == 0 {
                        items := make([]string, 0)
-
-                       metadata.Each(e.CamelCatalog, e.Integration.Sources(), 
func(_ int, meta metadata.IntegrationMetadata) bool {
+                       sources, err := 
kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
+                       if err != nil {
+                               return false, err
+                       }
+                       metadata.Each(e.CamelCatalog, sources, func(_ int, meta 
metadata.IntegrationMetadata) bool {
                                items = append(items, 
knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeEvent)...)
                                return true
                        })
diff --git a/pkg/util/digest/digest.go b/pkg/util/digest/digest.go
index 6ccd3ed..4b2dfba 100644
--- a/pkg/util/digest/digest.go
+++ b/pkg/util/digest/digest.go
@@ -71,7 +71,7 @@ func ComputeForIntegration(integration *v1.Integration) 
(string, error) {
 
        // Integration flows
        if len(integration.Spec.Flows) > 0 {
-               flows, err := flow.Marshal(integration.Spec.Flows)
+               flows, err := flow.ToYamlDSL(integration.Spec.Flows)
                if err != nil {
                        return "", err
                }
diff --git a/pkg/util/flow/flow.go b/pkg/util/flow/flow.go
index e495d55..fa963cf 100644
--- a/pkg/util/flow/flow.go
+++ b/pkg/util/flow/flow.go
@@ -31,13 +31,13 @@ import (
        v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
 )
 
-// UnmarshalString reads flows contained in a string
-func UnmarshalString(flowsString string) ([]v1.Flow, error) {
-       return Unmarshal(bytes.NewReader([]byte(flowsString)))
+// FromYamlDSLString creates a slice of flows from a Camel YAML DSL string
+func FromYamlDSLString(flowsString string) ([]v1.Flow, error) {
+       return FromYamlDSL(bytes.NewReader([]byte(flowsString)))
 }
 
-// Unmarshal flows from a stream
-func Unmarshal(reader io.Reader) ([]v1.Flow, error) {
+// FromYamlDSL creates a slice of flows from a Camel YAML DSL stream
+func FromYamlDSL(reader io.Reader) ([]v1.Flow, error) {
        buffered, err := ioutil.ReadAll(reader)
        if err != nil {
                return nil, err
@@ -56,8 +56,8 @@ func Unmarshal(reader io.Reader) ([]v1.Flow, error) {
        return flows, err
 }
 
-// Marshal flows as byte array
-func Marshal(flows []v1.Flow) ([]byte, error) {
+// ToYamlDSL converts a flow into its Camel YAML DSL equivalent
+func ToYamlDSL(flows []v1.Flow) ([]byte, error) {
        data, err := json.Marshal(&flows)
        if err != nil {
                return nil, err
diff --git a/pkg/util/flow/flow_test.go b/pkg/util/flow/flow_test.go
index c9248cb..731e2ff 100644
--- a/pkg/util/flow/flow_test.go
+++ b/pkg/util/flow/flow_test.go
@@ -34,7 +34,7 @@ func TestReadWriteYaml(t *testing.T) {
 `
 
        yamlReader := bytes.NewReader([]byte(yaml))
-       flows, err := Unmarshal(yamlReader)
+       flows, err := FromYamlDSL(yamlReader)
        assert.NoError(t, err)
        assert.NotNil(t, flows)
        assert.Len(t, flows, 1)
@@ -46,7 +46,7 @@ func TestReadWriteYaml(t *testing.T) {
        assert.NotNil(t, flow["from"])
        assert.Nil(t, flow["xx"])
 
-       data, err := Marshal(flows)
+       data, err := ToYamlDSL(flows)
        assert.NoError(t, err)
        assert.NotNil(t, data)
        assert.Equal(t, yaml, string(data))
diff --git a/pkg/util/kubernetes/resolver.go b/pkg/util/kubernetes/resolver.go
index fbb3eff..ae7621a 100644
--- a/pkg/util/kubernetes/resolver.go
+++ b/pkg/util/kubernetes/resolver.go
@@ -22,7 +22,7 @@ import (
        "fmt"
 
        v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
-
+       "github.com/apache/camel-k/pkg/util/gzip"
        corev1 "k8s.io/api/core/v1"
        controller "sigs.k8s.io/controller-runtime/pkg/client"
 )
@@ -71,10 +71,25 @@ func Resolve(data *v1.DataSpec, mapLookup func(string) 
(*corev1.ConfigMap, error
                //
                // Replace ref source content with real content
                //
-               data.Content = cm.Data["content"]
+               key := data.ContentKey
+               if key == "" {
+                       key = "content"
+               }
+               data.Content = cm.Data[key]
                data.ContentRef = ""
        }
 
+       if data.Compression {
+               cnt := []byte(data.Content)
+               var uncompressed []byte
+               var err error
+               if uncompressed, err = gzip.UncompressBase64(cnt); err != nil {
+                       return err
+               }
+               data.Compression = false
+               data.Content = string(uncompressed)
+       }
+
        return nil
 }
 

Reply via email to