This is an automated email from the ASF dual-hosted git repository. nferraro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k.git
commit 0fbbdd93342bcfb6e654723766d3144486961d5a Author: Nicola Ferraro <[email protected]> AuthorDate: Fri Jul 10 12:24:42 2020 +0200 kamelet: compelete e2e example --- e2e/knative/knative_platform_test.go | 4 +- examples/kamelets/fake-usage.groovy | 21 +++++++++ examples/kamelets/usage.groovy | 20 ++++++++ go.sum | 2 + pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go | 2 +- pkg/cmd/run.go | 2 +- pkg/controller/kamelet/kamelet_controller.go | 1 + .../flow_test.go => controller/kamelet/monitor.go} | 53 ++++++++++------------ pkg/metadata/metadata.go | 28 ++---------- pkg/trait/dependencies.go | 7 ++- pkg/trait/init.go | 2 +- pkg/trait/kamelets.go | 33 ++++++++++---- pkg/trait/kamelets_test.go | 22 ++++----- pkg/trait/knative.go | 43 +++++++++++++----- pkg/util/digest/digest.go | 2 +- pkg/util/flow/flow.go | 14 +++--- pkg/util/flow/flow_test.go | 4 +- pkg/util/kubernetes/resolver.go | 19 +++++++- 18 files changed, 175 insertions(+), 104 deletions(-) diff --git a/e2e/knative/knative_platform_test.go b/e2e/knative/knative_platform_test.go index 857a72f..c32711d 100644 --- a/e2e/knative/knative_platform_test.go +++ b/e2e/knative/knative_platform_test.go @@ -58,10 +58,10 @@ func TestKnativePlatformTest(t *testing.T) { // Change something in the integration to produce a redeploy Expect(UpdateIntegration(ns, "yaml", func(it *v1.Integration) { it.Spec.Profile = "" - content, err := flow.Marshal(it.Spec.Flows) + content, err := flow.ToYamlDSL(it.Spec.Flows) assert.NoError(t, err) newData := strings.ReplaceAll(string(content), "string!", "string!!!") - newFlows, err := flow.UnmarshalString(newData) + newFlows, err := flow.FromYamlDSLString(newData) assert.NoError(t, err) it.Spec.Flows = newFlows })).To(BeNil()) diff --git a/examples/kamelets/fake-usage.groovy b/examples/kamelets/fake-usage.groovy new file mode 100755 index 0000000..bff523b --- /dev/null +++ b/examples/kamelets/fake-usage.groovy @@ -0,0 +1,21 @@ +// camel-k: language=groovy trait=kamelets.list=timer +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// The integration should contain the kamelet as source + +// Until the kamelet component is added in runtime \ No newline at end of file diff --git a/examples/kamelets/usage.groovy b/examples/kamelets/usage.groovy new file mode 100755 index 0000000..15090c9 --- /dev/null +++ b/examples/kamelets/usage.groovy @@ -0,0 +1,20 @@ +// camel-k: language=groovy +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +from('kamelet:timer') + .to('log:info') diff --git a/go.sum b/go.sum index 40b5e55..34d32df 100644 --- a/go.sum +++ b/go.sum @@ -984,6 +984,8 @@ github.com/opencontainers/runc v1.0.0-rc2.0.20190611121236-6cc515888830/go.mod h github.com/opencontainers/runtime-spec v0.1.2-0.20190507144316-5b71a03e2700/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/runtime-tools v0.0.0-20181011054405-1d69bd0f9c39/go.mod h1:r3f7wjNzSs2extwzU3Y+6pKfobzPh+kKFJ3ofN+3nfs= github.com/openshift/api v0.0.0-20200205133042-34f0ec8dab87/go.mod h1:fT6U/JfG8uZzemTRwZA2kBDJP5nWz7v05UHnty/D+pk= +github.com/openshift/api v0.0.0-20200221181648-8ce0047d664f h1:ATPK7UhEwglONJc8qGsq41TbPk0XA4Kpm7XZZ3mlhAY= +github.com/openshift/api v0.0.0-20200221181648-8ce0047d664f/go.mod h1:dh9o4Fs58gpFXGSYfnVxGR9PnV53I8TW84pQaJDdGiY= github.com/openshift/api v3.9.1-0.20190927182313-d4a64ec2cbd8+incompatible h1:YwFnUQ5RQ17CmkxHyjpQnWAQOGkLKXY0shOUEyqaCGk= github.com/openshift/api v3.9.1-0.20190927182313-d4a64ec2cbd8+incompatible/go.mod h1:dh9o4Fs58gpFXGSYfnVxGR9PnV53I8TW84pQaJDdGiY= github.com/openshift/client-go v0.0.0-20190923180330-3b6373338c9b/go.mod h1:6rzn+JTr7+WYS2E1TExP4gByoABxMznR6y2SnUIkmxk= diff --git a/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go index 1292dc3..e6c991c 100644 --- a/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/camel/v1alpha1/zz_generated.deepcopy.go @@ -477,7 +477,7 @@ func (in *KameletSpec) DeepCopyInto(out *KameletSpec) { if in.Flow != nil { in, out := &in.Flow, &out.Flow *out = new(v1.Flow) - **out = **in + (*in).DeepCopyInto(*out) } out.Authorization = in.Authorization if in.Types != nil { diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 5c7e4c3..744f635 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -498,7 +498,7 @@ func (o *runCmdOptions) updateIntegrationCode(c client.Client, sources []string, } if o.UseFlows && (strings.HasSuffix(source, ".yaml") || strings.HasSuffix(source, ".yml")) { - flows, err := flow.UnmarshalString(data) + flows, err := flow.FromYamlDSLString(data) if err != nil { return nil, err } diff --git a/pkg/controller/kamelet/kamelet_controller.go b/pkg/controller/kamelet/kamelet_controller.go index 048cf22..baad186 100644 --- a/pkg/controller/kamelet/kamelet_controller.go +++ b/pkg/controller/kamelet/kamelet_controller.go @@ -129,6 +129,7 @@ func (r *ReconcileKamelet) Reconcile(request reconcile.Request) (reconcile.Resul actions := []Action{ NewInitializeAction(), + NewMonitorAction(), } var targetPhase v1alpha1.KameletPhase diff --git a/pkg/util/flow/flow_test.go b/pkg/controller/kamelet/monitor.go similarity index 52% copy from pkg/util/flow/flow_test.go copy to pkg/controller/kamelet/monitor.go index c9248cb..49935d2 100644 --- a/pkg/util/flow/flow_test.go +++ b/pkg/controller/kamelet/monitor.go @@ -15,39 +15,32 @@ See the License for the specific language governing permissions and limitations under the License. */ -package flow +package kamelet import ( - "bytes" - "encoding/json" - "testing" + "context" - "github.com/stretchr/testify/assert" + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" ) -func TestReadWriteYaml(t *testing.T) { - // yaml in conventional form as marshalled by the go runtime - yaml := `- from: - steps: - - to: log:info - uri: timer:tick -` - - yamlReader := bytes.NewReader([]byte(yaml)) - flows, err := Unmarshal(yamlReader) - assert.NoError(t, err) - assert.NotNil(t, flows) - assert.Len(t, flows, 1) - - flow := map[string]interface{}{} - err = json.Unmarshal(flows[0].RawMessage, &flow) - assert.NoError(t, err) - - assert.NotNil(t, flow["from"]) - assert.Nil(t, flow["xx"]) - - data, err := Marshal(flows) - assert.NoError(t, err) - assert.NotNil(t, data) - assert.Equal(t, yaml, string(data)) +// NewMonitorAction returns an action that monitors the kamelet after it's fully initialized +func NewMonitorAction() Action { + return &monitorAction{} +} + +type monitorAction struct { + baseAction +} + +func (action *monitorAction) Name() string { + return "monitor" +} + +func (action *monitorAction) CanHandle(kamelet *v1alpha1.Kamelet) bool { + return kamelet.Status.Phase == v1alpha1.KameletPhaseReady +} + +func (action *monitorAction) Handle(ctx context.Context, kamelet *v1alpha1.Kamelet) (*v1alpha1.Kamelet, error) { + // Doing nothing for now + return kamelet, nil } diff --git a/pkg/metadata/metadata.go b/pkg/metadata/metadata.go index 27a2c8d..03c8ca1 100644 --- a/pkg/metadata/metadata.go +++ b/pkg/metadata/metadata.go @@ -22,9 +22,6 @@ import ( v1 "github.com/apache/camel-k/pkg/apis/camel/v1" "github.com/apache/camel-k/pkg/util/camel" - "github.com/apache/camel-k/pkg/util/gzip" - "github.com/apache/camel-k/pkg/util/log" - src "github.com/apache/camel-k/pkg/util/source" ) @@ -64,10 +61,11 @@ func merge(m1 src.Metadata, m2 src.Metadata) src.Metadata { // Extract returns metadata information from the source code func Extract(catalog *camel.RuntimeCatalog, source v1.SourceSpec) IntegrationMetadata { - var err error - source, err = uncompress(source) - if err != nil { - log.Errorf(err, "unable to uncompress source %s: %v", source.Name, err) + if source.ContentRef != "" { + panic("source must be dereferenced before calling this method") + } + if source.Compression { + panic("source must be uncompressed before calling this method") } language := source.InferLanguage() @@ -94,19 +92,3 @@ func Each(catalog *camel.RuntimeCatalog, sources []v1.SourceSpec, consumer func( } } } - -func uncompress(spec v1.SourceSpec) (v1.SourceSpec, error) { - if spec.Compression { - data := []byte(spec.Content) - var uncompressed []byte - var err error - if uncompressed, err = gzip.UncompressBase64(data); err != nil { - return spec, err - } - newSpec := spec - newSpec.Compression = false - newSpec.Content = string(uncompressed) - return newSpec, nil - } - return spec, nil -} diff --git a/pkg/trait/dependencies.go b/pkg/trait/dependencies.go index 8a13087..4a704b6 100644 --- a/pkg/trait/dependencies.go +++ b/pkg/trait/dependencies.go @@ -19,6 +19,7 @@ package trait import ( "fmt" + "github.com/apache/camel-k/pkg/util/kubernetes" "github.com/apache/camel-k/pkg/metadata" @@ -66,7 +67,11 @@ func (t *dependenciesTrait) Apply(e *Environment) error { dependencies.Add(fmt.Sprintf("mvn:%s/%s", d.GroupID, d.ArtifactID)) } - for _, s := range e.Integration.Sources() { + sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources) + if err != nil { + return err + } + for _, s := range sources { meta := metadata.Extract(e.CamelCatalog, s) lang := s.InferLanguage() diff --git a/pkg/trait/init.go b/pkg/trait/init.go index 32236e9..9685832 100644 --- a/pkg/trait/init.go +++ b/pkg/trait/init.go @@ -54,7 +54,7 @@ func (t *initTrait) Apply(e *Environment) error { // Flows need to be turned into a generated source if len(e.Integration.Spec.Flows) > 0 { - content, err := flow.Marshal(e.Integration.Spec.Flows) + content, err := flow.ToYamlDSL(e.Integration.Spec.Flows) if err != nil { return err } diff --git a/pkg/trait/kamelets.go b/pkg/trait/kamelets.go index 80d4bc2..994ddcb 100644 --- a/pkg/trait/kamelets.go +++ b/pkg/trait/kamelets.go @@ -20,6 +20,8 @@ package trait import ( "encoding/json" "fmt" + "github.com/apache/camel-k/pkg/util/flow" + "github.com/apache/camel-k/pkg/util/kubernetes" "regexp" "sort" "strconv" @@ -83,10 +85,18 @@ func (t *kameletsTrait) Configure(e *Environment) (bool, error) { return false, nil } + if !e.IntegrationInPhase(v1.IntegrationPhaseInitialization) { + return false, nil + } + if t.Auto == nil || *t.Auto { if t.List == "" { var kamelets []string - metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool { + sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources) + if err != nil { + return false, err + } + metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool { util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.FromURIs)) util.StringSliceUniqueConcat(&kamelets, extractKamelets(meta.ToURIs)) return true @@ -121,6 +131,10 @@ func (t *kameletsTrait) addKamelets(e *Environment) error { return err } + if kamelet.Status.Phase != v1alpha1.KameletPhaseReady { + return fmt.Errorf("kamelet %q is not %s: %s", k, v1alpha1.KameletPhaseReady, kamelet.Status.Phase) + } + if err := t.addKameletAsSource(e, kamelet); err != nil { return err } @@ -137,21 +151,20 @@ func (t *kameletsTrait) addKameletAsSource(e *Environment, kamelet v1alpha1.Kame var sources []v1.SourceSpec if kamelet.Spec.Flow != nil { - // TODO fixme removed for changes to Flow - //flowData, err := flows.Marshal([]v1.Flow{*kamelet.Spec.Flow}) - //if err != nil { - // return err - //} + + flowData, err := flow.ToYamlDSL([]v1.Flow{*kamelet.Spec.Flow}) + if err != nil { + return err + } flowSource := v1.SourceSpec{ DataSpec: v1.DataSpec{ - Name: fmt.Sprintf("%s.yaml", kamelet.Name), - //Content: string(flowData), - Content: string(*kamelet.Spec.Flow), + Name: fmt.Sprintf("%s.yaml", kamelet.Name), + Content: string(flowData), }, Language: v1.LanguageYaml, Type: v1.SourceTypeKamelet, } - flowSource, err := integrationSourceFromKameletSource(e, kamelet, flowSource, fmt.Sprintf("%s-kamelet-%s-flow", e.Integration.Name, kamelet.Name)) + flowSource, err = integrationSourceFromKameletSource(e, kamelet, flowSource, fmt.Sprintf("%s-kamelet-%s-flow", e.Integration.Name, kamelet.Name)) if err != nil { return err } diff --git a/pkg/trait/kamelets_test.go b/pkg/trait/kamelets_test.go index 59657be..d2a1321 100644 --- a/pkg/trait/kamelets_test.go +++ b/pkg/trait/kamelets_test.go @@ -19,7 +19,7 @@ package trait import ( "context" - "gopkg.in/yaml.v2" + "encoding/json" "testing" v1 "github.com/apache/camel-k/pkg/apis/camel/v1" @@ -88,7 +88,7 @@ func TestKameletLookup(t *testing.T) { Name: "timer", }, Spec: v1alpha1.KameletSpec{ - Flow: deleteMeAtSomePoint(map[string]interface{}{ + Flow: marshalOrFail(map[string]interface{}{ "from": map[string]interface{}{ "uri": "timer:tick", }, @@ -131,7 +131,7 @@ func TestKameletSecondarySourcesLookup(t *testing.T) { Name: "timer", }, Spec: v1alpha1.KameletSpec{ - Flow: deleteMeAtSomePoint(map[string]interface{}{ + Flow: marshalOrFail(map[string]interface{}{ "from": map[string]interface{}{ "uri": "timer:tick", }, @@ -236,7 +236,7 @@ func TestErrorMultipleKameletSources(t *testing.T) { Type: v1.SourceTypeKamelet, }, }, - Flow: deleteMeAtSomePoint(map[string]interface{}{ + Flow: marshalOrFail(map[string]interface{}{ "from": map[string]interface{}{ "uri": "timer:tick", }, @@ -265,7 +265,7 @@ func TestMultipleKamelets(t *testing.T) { Name: "timer", }, Spec: v1alpha1.KameletSpec{ - Flow: deleteMeAtSomePoint(map[string]interface{}{ + Flow: marshalOrFail(map[string]interface{}{ "from": map[string]interface{}{ "uri": "timer:tick", }, @@ -290,7 +290,7 @@ func TestMultipleKamelets(t *testing.T) { Name: "logger", }, Spec: v1alpha1.KameletSpec{ - Flow: deleteMeAtSomePoint(map[string]interface{}{ + Flow: marshalOrFail(map[string]interface{}{ "from": map[string]interface{}{ "uri": "tbd:endpoint", "steps": []interface{}{ @@ -358,7 +358,7 @@ func TestKameletConfigLookup(t *testing.T) { Name: "timer", }, Spec: v1alpha1.KameletSpec{ - Flow: deleteMeAtSomePoint(map[string]interface{}{ + Flow: marshalOrFail(map[string]interface{}{ "from": map[string]interface{}{ "uri": "timer:tick", }, @@ -420,7 +420,7 @@ func TestKameletNamedConfigLookup(t *testing.T) { Name: "timer", }, Spec: v1alpha1.KameletSpec{ - Flow: deleteMeAtSomePoint(map[string]interface{}{ + Flow: marshalOrFail(map[string]interface{}{ "from": map[string]interface{}{ "uri": "timer:tick", }, @@ -511,11 +511,11 @@ func createKameletsTestEnvironment(flow string, objects ...runtime.Object) (*kam return trait, environment } -func deleteMeAtSomePoint(flow map[string]interface{}) *v1.Flow { - data, err := yaml.Marshal(flow) +func marshalOrFail(flow map[string]interface{}) *v1.Flow { + data, err := json.Marshal(flow) if err != nil { panic(err) } - f := v1.Flow(data) + f := v1.Flow{data} return &f } diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go index 92103f4..5ff9c46 100644 --- a/pkg/trait/knative.go +++ b/pkg/trait/knative.go @@ -23,6 +23,7 @@ import ( "reflect" "strings" + "github.com/apache/camel-k/pkg/util/kubernetes" v1 "github.com/apache/camel-k/pkg/apis/camel/v1" knativeapi "github.com/apache/camel-k/pkg/apis/camel/v1/knative" "github.com/apache/camel-k/pkg/metadata" @@ -110,8 +111,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) { if t.Auto == nil || *t.Auto { if len(t.ChannelSources) == 0 { items := make([]string, 0) - - metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool { + sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources) + if err != nil { + return false, err + } + metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool { items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeChannel)...) return true }) @@ -120,8 +124,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) { } if len(t.ChannelSinks) == 0 { items := make([]string, 0) - - metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool { + sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources) + if err != nil { + return false, err + } + metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool { items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeChannel)...) return true }) @@ -130,8 +137,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) { } if len(t.EndpointSources) == 0 { items := make([]string, 0) - - metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool { + sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources) + if err != nil { + return false, err + } + metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool { items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeEndpoint)...) return true }) @@ -140,8 +150,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) { } if len(t.EndpointSinks) == 0 { items := make([]string, 0) - - metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool { + sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources) + if err != nil { + return false, err + } + metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool { items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeEndpoint)...) return true }) @@ -150,8 +163,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) { } if len(t.EventSources) == 0 { items := make([]string, 0) - - metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool { + sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources) + if err != nil { + return false, err + } + metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool { items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeEvent)...) return true }) @@ -160,8 +176,11 @@ func (t *knativeTrait) Configure(e *Environment) (bool, error) { } if len(t.EventSinks) == 0 { items := make([]string, 0) - - metadata.Each(e.CamelCatalog, e.Integration.Sources(), func(_ int, meta metadata.IntegrationMetadata) bool { + sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources) + if err != nil { + return false, err + } + metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool { items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeEvent)...) return true }) diff --git a/pkg/util/digest/digest.go b/pkg/util/digest/digest.go index 6ccd3ed..4b2dfba 100644 --- a/pkg/util/digest/digest.go +++ b/pkg/util/digest/digest.go @@ -71,7 +71,7 @@ func ComputeForIntegration(integration *v1.Integration) (string, error) { // Integration flows if len(integration.Spec.Flows) > 0 { - flows, err := flow.Marshal(integration.Spec.Flows) + flows, err := flow.ToYamlDSL(integration.Spec.Flows) if err != nil { return "", err } diff --git a/pkg/util/flow/flow.go b/pkg/util/flow/flow.go index e495d55..fa963cf 100644 --- a/pkg/util/flow/flow.go +++ b/pkg/util/flow/flow.go @@ -31,13 +31,13 @@ import ( v1 "github.com/apache/camel-k/pkg/apis/camel/v1" ) -// UnmarshalString reads flows contained in a string -func UnmarshalString(flowsString string) ([]v1.Flow, error) { - return Unmarshal(bytes.NewReader([]byte(flowsString))) +// FromYamlDSLString creates a slice of flows from a Camel YAML DSL string +func FromYamlDSLString(flowsString string) ([]v1.Flow, error) { + return FromYamlDSL(bytes.NewReader([]byte(flowsString))) } -// Unmarshal flows from a stream -func Unmarshal(reader io.Reader) ([]v1.Flow, error) { +// FromYamlDSL creates a slice of flows from a Camel YAML DSL stream +func FromYamlDSL(reader io.Reader) ([]v1.Flow, error) { buffered, err := ioutil.ReadAll(reader) if err != nil { return nil, err @@ -56,8 +56,8 @@ func Unmarshal(reader io.Reader) ([]v1.Flow, error) { return flows, err } -// Marshal flows as byte array -func Marshal(flows []v1.Flow) ([]byte, error) { +// ToYamlDSL converts a flow into its Camel YAML DSL equivalent +func ToYamlDSL(flows []v1.Flow) ([]byte, error) { data, err := json.Marshal(&flows) if err != nil { return nil, err diff --git a/pkg/util/flow/flow_test.go b/pkg/util/flow/flow_test.go index c9248cb..731e2ff 100644 --- a/pkg/util/flow/flow_test.go +++ b/pkg/util/flow/flow_test.go @@ -34,7 +34,7 @@ func TestReadWriteYaml(t *testing.T) { ` yamlReader := bytes.NewReader([]byte(yaml)) - flows, err := Unmarshal(yamlReader) + flows, err := FromYamlDSL(yamlReader) assert.NoError(t, err) assert.NotNil(t, flows) assert.Len(t, flows, 1) @@ -46,7 +46,7 @@ func TestReadWriteYaml(t *testing.T) { assert.NotNil(t, flow["from"]) assert.Nil(t, flow["xx"]) - data, err := Marshal(flows) + data, err := ToYamlDSL(flows) assert.NoError(t, err) assert.NotNil(t, data) assert.Equal(t, yaml, string(data)) diff --git a/pkg/util/kubernetes/resolver.go b/pkg/util/kubernetes/resolver.go index fbb3eff..ae7621a 100644 --- a/pkg/util/kubernetes/resolver.go +++ b/pkg/util/kubernetes/resolver.go @@ -22,7 +22,7 @@ import ( "fmt" v1 "github.com/apache/camel-k/pkg/apis/camel/v1" - + "github.com/apache/camel-k/pkg/util/gzip" corev1 "k8s.io/api/core/v1" controller "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -71,10 +71,25 @@ func Resolve(data *v1.DataSpec, mapLookup func(string) (*corev1.ConfigMap, error // // Replace ref source content with real content // - data.Content = cm.Data["content"] + key := data.ContentKey + if key == "" { + key = "content" + } + data.Content = cm.Data[key] data.ContentRef = "" } + if data.Compression { + cnt := []byte(data.Content) + var uncompressed []byte + var err error + if uncompressed, err = gzip.UncompressBase64(cnt); err != nil { + return err + } + data.Compression = false + data.Content = string(uncompressed) + } + return nil }
