This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git
commit e5724510000d173d9d24a0f01326e58c6db288c6 Author: Christoph Deppisch <[email protected]> AuthorDate: Thu Mar 17 19:57:06 2022 +0100 chore: Add header deserialize option on Kafka source - Adds utility class to auto deserialize message headers from byte[] to String - Option must be explicitly enabled on the source Kamelet - Exclude non String Kafka headers from deserialization (kafka.HEADERS and CamelKafkaManualCommit) --- .github/workflows/yaks-tests.yaml | 1 + kamelets/kafka-source.kamelet.yaml | 21 +++- .../kafka/KafkaHeaderDeserializer.java | 89 +++++++++++++++++ .../kafka/KafkaHeaderDeserializerTest.java | 108 +++++++++++++++++++++ .../resources/kamelets/kafka-source.kamelet.yaml | 21 +++- test/kafka/install.sh | 28 ++++++ test/kafka/kafka-sink-test.yaml | 42 ++++++++ test/kafka/kafka-sink.feature | 45 +++++++++ test/kafka/kafka-source-test.yaml | 37 +++++++ test/kafka/kafka-source.feature | 66 +++++++++++++ test/kafka/uninstall.sh | 22 +++++ test/kafka/yaks-config.yaml | 48 +++++++++ 12 files changed, 522 insertions(+), 6 deletions(-) diff --git a/.github/workflows/yaks-tests.yaml b/.github/workflows/yaks-tests.yaml index b4b7f91..e4b2d9d 100644 --- a/.github/workflows/yaks-tests.yaml +++ b/.github/workflows/yaks-tests.yaml @@ -115,6 +115,7 @@ jobs: yaks run test/timer-source $YAKS_RUN_OPTIONS yaks run test/earthquake-source $YAKS_RUN_OPTIONS yaks run test/rest-openapi-sink $YAKS_RUN_OPTIONS + yaks run test/kafka $YAKS_RUN_OPTIONS - name: YAKS Report if: failure() run: | diff --git a/kamelets/kafka-source.kamelet.yaml b/kamelets/kafka-source.kamelet.yaml index 82607c9..6bce42c 100644 --- a/kamelets/kafka-source.kamelet.yaml +++ b/kamelets/kafka-source.kamelet.yaml @@ -62,12 +62,12 @@ spec: default: SASL_SSL saslMechanism: title: SASL Mechanism - description: The Simple Authentication and Security Layer (SASL) Mechanism used. + description: The Simple Authentication and Security Layer (SASL) Mechanism used. type: string default: PLAIN user: title: Username - description: Username to authenticate to Kafka + description: Username to authenticate to Kafka type: string x-descriptors: - urn:camel:group:credentials @@ -117,7 +117,16 @@ spec: x-descriptors: - urn:keda:metadata:consumerGroup - urn:keda:required + deserializeHeaders: + title: Automatically Deserialize Headers + description: When enabled the Kamelet source will deserialize all message headers to String representation. + type: boolean + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + default: false dependencies: + - github:apache.camel-kamelets:camel-kamelets-utils:main-SNAPSHOT + - "camel:core" - "camel:kafka" - "camel:kamelet" template: @@ -134,4 +143,10 @@ spec: autoOffsetReset: "{{autoOffsetReset}}" groupId: "{{?consumerGroup}}" steps: - - to: "kamelet:sink" + - set-property: + name: deserializeHeaders + constant: "{{deserializeHeaders}}" + - bean: "org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer" + - remove-property: + name: deserializeHeaders + - to: "kamelet:sink" diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java new file mode 100644 index 0000000..7cab1ee --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.serialization.kafka; + +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangeProperty; +import org.apache.camel.TypeConverter; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.support.SimpleTypeConverter; + +/** + * Header deserializer used in Kafka source Kamelet. Automatically converts all message headers to String. + * Uses given type converter implementation set on the Camel context to convert values. If no type converter is set + * the deserializer uses its own fallback conversion implementation. + */ +public class KafkaHeaderDeserializer { + + private final SimpleTypeConverter defaultTypeConverter = new SimpleTypeConverter(true, KafkaHeaderDeserializer::convert); + + public void process(@ExchangeProperty("deserializeHeaders") boolean deserializeHeaders, Exchange exchange) throws Exception { + if (!deserializeHeaders) { + return; + } + + Map<String, Object> headers = exchange.getMessage().getHeaders(); + + TypeConverter typeConverter = exchange.getContext().getTypeConverter(); + if (typeConverter == null) { + typeConverter = defaultTypeConverter; + } + + for (Map.Entry<String, Object> header : headers.entrySet()) { + if (shouldDeserialize(header)) { + header.setValue(typeConverter.convertTo(String.class, header.getValue())); + } + } + } + + /** + * Fallback conversion strategy supporting null values, String and byte[]. Converts headers to respective + * String representation or null. + * @param type target type, always String in this case. + * @param exchange the exchange containing all headers to convert. + * @param value the current value to convert. + * @return String representation of given value or null if value itself is null. + */ + private static Object convert(Class<?> type, Exchange exchange, Object value) { + if (value == null) { + return null; + } + + if (value instanceof String) { + return value; + } + + if (value instanceof byte[]) { + return new String((byte[]) value, StandardCharsets.UTF_8); + } + + return value.toString(); + } + + /** + * Exclude special Kafka headers from auto deserialization. + * @param entry + * @return + */ + private boolean shouldDeserialize(Map.Entry<String, Object> entry) { + return !entry.getKey().equals(KafkaConstants.HEADERS) && !entry.getKey().equals(KafkaConstants.MANUAL_COMMIT); + } +} diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java new file mode 100644 index 0000000..d5d92f5 --- /dev/null +++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.serialization.kafka; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.support.DefaultExchange; +import org.apache.camel.support.SimpleTypeConverter; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class KafkaHeaderDeserializerTest { + + private DefaultCamelContext camelContext; + + private final KafkaHeaderDeserializer processor = new KafkaHeaderDeserializer(); + + @BeforeEach + void setup() { + this.camelContext = new DefaultCamelContext(); + } + + @Test + void shouldDeserializeHeaders() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setHeader("foo", "bar"); + exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8)); + exchange.getMessage().setHeader("fooNull", null); + exchange.getMessage().setHeader("number", 1L); + + processor.process(true, exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo")); + Assertions.assertEquals("barBytes", exchange.getMessage().getHeader("fooBytes")); + Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("fooNull")); + Assertions.assertNull(exchange.getMessage().getHeader("fooNull")); + Assertions.assertEquals("1", exchange.getMessage().getHeader("number")); + } + + @Test + void shouldDeserializeHeadersViaTypeConverter() throws Exception { + camelContext.setTypeConverter(new SimpleTypeConverter(true, (type, exchange, value) -> "converted")); + + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setHeader("foo", "bar"); + exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8)); + exchange.getMessage().setHeader("fooNull", null); + + processor.process(true, exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + Assertions.assertEquals("converted", exchange.getMessage().getHeader("foo")); + Assertions.assertEquals("converted", exchange.getMessage().getHeader("fooBytes")); + Assertions.assertEquals("converted", exchange.getMessage().getHeader("fooNull")); + } + + @Test + void shouldFallbackToDefaultConverter() throws Exception { + camelContext.setTypeConverter(null); + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setHeader("foo", "bar"); + exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8)); + + processor.process(true, exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo")); + Assertions.assertEquals("barBytes", exchange.getMessage().getHeader("fooBytes")); + } + + @Test + void shouldNotDeserializeHeadersWhenDisabled() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setHeader("foo", "bar"); + exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8)); + + processor.process(false, exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo")); + Assertions.assertTrue(exchange.getMessage().getHeader("fooBytes") instanceof byte[]); + Assertions.assertEquals(Arrays.toString("barBytes".getBytes(StandardCharsets.UTF_8)), Arrays.toString((byte[]) exchange.getMessage().getHeader("fooBytes"))); + } +} diff --git a/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml index 82607c9..6bce42c 100644 --- a/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml +++ b/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml @@ -62,12 +62,12 @@ spec: default: SASL_SSL saslMechanism: title: SASL Mechanism - description: The Simple Authentication and Security Layer (SASL) Mechanism used. + description: The Simple Authentication and Security Layer (SASL) Mechanism used. type: string default: PLAIN user: title: Username - description: Username to authenticate to Kafka + description: Username to authenticate to Kafka type: string x-descriptors: - urn:camel:group:credentials @@ -117,7 +117,16 @@ spec: x-descriptors: - urn:keda:metadata:consumerGroup - urn:keda:required + deserializeHeaders: + title: Automatically Deserialize Headers + description: When enabled the Kamelet source will deserialize all message headers to String representation. + type: boolean + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + default: false dependencies: + - github:apache.camel-kamelets:camel-kamelets-utils:main-SNAPSHOT + - "camel:core" - "camel:kafka" - "camel:kamelet" template: @@ -134,4 +143,10 @@ spec: autoOffsetReset: "{{autoOffsetReset}}" groupId: "{{?consumerGroup}}" steps: - - to: "kamelet:sink" + - set-property: + name: deserializeHeaders + constant: "{{deserializeHeaders}}" + - bean: "org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer" + - remove-property: + name: deserializeHeaders + - to: "kamelet:sink" diff --git a/test/kafka/install.sh b/test/kafka/install.sh new file mode 100644 index 0000000..c5415ba --- /dev/null +++ b/test/kafka/install.sh @@ -0,0 +1,28 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Install Kafka +kubectl create -f https://strimzi.io/install/latest?namespace=default + +# Apply the `Kafka` Cluster CR file +kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-ephemeral-single.yaml + +# wait for everything to start +kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s + +# create default topic +kubectl apply -f https://strimzi.io/examples/latest/topic/kafka-topic.yaml diff --git a/test/kafka/kafka-sink-test.yaml b/test/kafka/kafka-sink-test.yaml new file mode 100644 index 0000000..ffce924 --- /dev/null +++ b/test/kafka/kafka-sink-test.yaml @@ -0,0 +1,42 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: KameletBinding +metadata: + name: kafka-sink-test +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: timer-source + properties: + period: 5000 + contentType: application/json + message: ${message} + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: kafka-sink + properties: + bootstrapServers: ${bootstrap.server.host}.${YAKS_NAMESPACE}:${bootstrap.server.port} + user: ${user} + password: ${password} + topic: ${topic} + securityProtocol: ${securityProtocol} diff --git a/test/kafka/kafka-sink.feature b/test/kafka/kafka-sink.feature new file mode 100644 index 0000000..bc106f7 --- /dev/null +++ b/test/kafka/kafka-sink.feature @@ -0,0 +1,45 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +Feature: Kafka Kamelet sink + + Background: + Given variable user is "" + Given variable password is "" + Given variables + | bootstrap.server.host | my-cluster-kafka-bootstrap | + | bootstrap.server.port | 9092 | + | securityProtocol | PLAINTEXT | + | topic | my-topic | + | message | Camel K rocks! | + Given Kafka topic: ${topic} + Given Kafka topic partition: 0 + + Scenario: Create Kamelet binding + Given Camel K resource polling configuration + | maxAttempts | 200 | + | delayBetweenAttempts | 2000 | + When load KameletBinding kafka-sink-test.yaml + Then Camel K integration kafka-sink-test should be running + + Scenario: Receive message on Kafka topic and verify sink output + Given Kafka connection + | url | ${bootstrap.server.host}.${YAKS_NAMESPACE}:${bootstrap.server.port} | + Then receive Kafka message with body: ${message} + + Scenario: Remove resources + Given delete KameletBinding kafka-sink-test diff --git a/test/kafka/kafka-source-test.yaml b/test/kafka/kafka-source-test.yaml new file mode 100644 index 0000000..7fa6532 --- /dev/null +++ b/test/kafka/kafka-source-test.yaml @@ -0,0 +1,37 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: KameletBinding +metadata: + name: kafka-source-test +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: kafka-source + properties: + bootstrapServers: ${bootstrap.server.host}.${YAKS_NAMESPACE}:${bootstrap.server.port} + user: ${user} + password: ${password} + topic: ${topic} + securityProtocol: ${securityProtocol} + deserializeHeaders: ${deserializeHeaders} + sink: + uri: http://kafka-to-http-service.${YAKS_NAMESPACE}/result + diff --git a/test/kafka/kafka-source.feature b/test/kafka/kafka-source.feature new file mode 100644 index 0000000..b7759bc --- /dev/null +++ b/test/kafka/kafka-source.feature @@ -0,0 +1,66 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +Feature: Kafka Kamelet source + + Background: + Given variable user is "" + Given variable password is "" + Given variables + | bootstrap.server.host | my-cluster-kafka-bootstrap | + | bootstrap.server.port | 9092 | + | securityProtocol | PLAINTEXT | + | deserializeHeaders | true | + | topic | my-topic | + | source | Kafka Kamelet source | + | message | Camel K rocks! | + Given Kafka topic: ${topic} + Given Kafka topic partition: 0 + Given HTTP server timeout is 15000 ms + Given HTTP server "kafka-to-http-service" + + Scenario: Create Http server + Given create Kubernetes service kafka-to-http-service with target port 8080 + + Scenario: Create Kamelet binding + Given Camel K resource polling configuration + | maxAttempts | 200 | + | delayBetweenAttempts | 2000 | + When load KameletBinding kafka-source-test.yaml + Then Camel K integration kafka-source-test should be running + And Camel K integration kafka-source-test should print Subscribing ${topic}-Thread 0 to topic ${topic} + And sleep 10sec + + Scenario: Send message to Kafka topic and verify sink output + Given variable key is "citrus:randomNumber(4)" + Given Kafka connection + | url | ${bootstrap.server.host}.${YAKS_NAMESPACE}:${bootstrap.server.port} | + Given Kafka message key: ${key} + When send Kafka message with body and headers: ${message} + | event-source | ${source} | + Then expect HTTP request body: ${message} + Then expect HTTP request headers + | event-source | ${source} | + | kafka.TOPIC | ${topic} | + | kafka.KEY | ${key} | + | kafka.PARTITION | 0 | + And receive POST /result + And send HTTP 200 OK + + Scenario: Remove resources + Given delete KameletBinding kafka-source-test + And delete Kubernetes service kafka-to-http-service diff --git a/test/kafka/uninstall.sh b/test/kafka/uninstall.sh new file mode 100644 index 0000000..81b3599 --- /dev/null +++ b/test/kafka/uninstall.sh @@ -0,0 +1,22 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# delete Kafka cluster +kubectl delete kafka/my-cluster + +# delete default topic +kubectl delete kafkatopic/my-topic diff --git a/test/kafka/yaks-config.yaml b/test/kafka/yaks-config.yaml new file mode 100644 index 0000000..3d1997b --- /dev/null +++ b/test/kafka/yaks-config.yaml @@ -0,0 +1,48 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +config: + namespace: + temporary: false + runtime: + env: + - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_KUBERNETES_AUTO_REMOVE_RESOURCES + value: false + - name: CITRUS_TYPE_CONVERTER + value: camel + resources: + - kafka-source-test.yaml + - kafka-sink-test.yaml + dump: + enabled: true + failedOnly: true + includes: + - app=camel-k +pre: + - name: Install Kafka + if: env:CI=true + script: install.sh + - name: Setup Kafka roles + if: env:CI=true + run: | + yaks role --add strimzi +post: + - name: Uninstall Kafka + if: env:CI=true + script: uninstall.sh
