This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git

commit e5724510000d173d9d24a0f01326e58c6db288c6
Author: Christoph Deppisch <[email protected]>
AuthorDate: Thu Mar 17 19:57:06 2022 +0100

    chore: Add header deserialize option on Kafka source
    
    - Adds utility class to auto deserialize message headers from byte[] to 
String
    - Option must be explicitly enabled on the source Kamelet
    - Exclude non String Kafka headers from deserialization (kafka.HEADERS and 
CamelKafkaManualCommit)
---
 .github/workflows/yaks-tests.yaml                  |   1 +
 kamelets/kafka-source.kamelet.yaml                 |  21 +++-
 .../kafka/KafkaHeaderDeserializer.java             |  89 +++++++++++++++++
 .../kafka/KafkaHeaderDeserializerTest.java         | 108 +++++++++++++++++++++
 .../resources/kamelets/kafka-source.kamelet.yaml   |  21 +++-
 test/kafka/install.sh                              |  28 ++++++
 test/kafka/kafka-sink-test.yaml                    |  42 ++++++++
 test/kafka/kafka-sink.feature                      |  45 +++++++++
 test/kafka/kafka-source-test.yaml                  |  37 +++++++
 test/kafka/kafka-source.feature                    |  66 +++++++++++++
 test/kafka/uninstall.sh                            |  22 +++++
 test/kafka/yaks-config.yaml                        |  48 +++++++++
 12 files changed, 522 insertions(+), 6 deletions(-)

diff --git a/.github/workflows/yaks-tests.yaml 
b/.github/workflows/yaks-tests.yaml
index b4b7f91..e4b2d9d 100644
--- a/.github/workflows/yaks-tests.yaml
+++ b/.github/workflows/yaks-tests.yaml
@@ -115,6 +115,7 @@ jobs:
         yaks run test/timer-source $YAKS_RUN_OPTIONS
         yaks run test/earthquake-source $YAKS_RUN_OPTIONS
         yaks run test/rest-openapi-sink $YAKS_RUN_OPTIONS
+        yaks run test/kafka $YAKS_RUN_OPTIONS
     - name: YAKS Report
       if: failure()
       run: |
diff --git a/kamelets/kafka-source.kamelet.yaml 
b/kamelets/kafka-source.kamelet.yaml
index 82607c9..6bce42c 100644
--- a/kamelets/kafka-source.kamelet.yaml
+++ b/kamelets/kafka-source.kamelet.yaml
@@ -62,12 +62,12 @@ spec:
         default: SASL_SSL
       saslMechanism:
         title: SASL Mechanism
-        description: The Simple Authentication and Security Layer (SASL) 
Mechanism used. 
+        description: The Simple Authentication and Security Layer (SASL) 
Mechanism used.
         type: string
         default: PLAIN
       user:
         title: Username
-        description: Username to authenticate to Kafka 
+        description: Username to authenticate to Kafka
         type: string
         x-descriptors:
         - urn:camel:group:credentials
@@ -117,7 +117,16 @@ spec:
         x-descriptors:
         - urn:keda:metadata:consumerGroup
         - urn:keda:required
+      deserializeHeaders:
+        title: Automatically Deserialize Headers
+        description: When enabled the Kamelet source will deserialize all 
message headers to String representation.
+        type: boolean
+        x-descriptors:
+          - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+        default: false
   dependencies:
+    - github:apache.camel-kamelets:camel-kamelets-utils:main-SNAPSHOT
+    - "camel:core"
     - "camel:kafka"
     - "camel:kamelet"
   template:
@@ -134,4 +143,10 @@ spec:
         autoOffsetReset: "{{autoOffsetReset}}"
         groupId: "{{?consumerGroup}}"
       steps:
-      - to: "kamelet:sink"
+        - set-property:
+            name: deserializeHeaders
+            constant: "{{deserializeHeaders}}"
+        - bean: 
"org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
+        - remove-property:
+            name: deserializeHeaders
+        - to: "kamelet:sink"
diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java
new file mode 100644
index 0000000..7cab1ee
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.serialization.kafka;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.support.SimpleTypeConverter;
+
+/**
+ * Header deserializer used in Kafka source Kamelet. Automatically converts 
all message headers to String.
+ * Uses given type converter implementation set on the Camel context to 
convert values. If no type converter is set
+ * the deserializer uses its own fallback conversion implementation.
+ */
+public class KafkaHeaderDeserializer {
+
+    private final SimpleTypeConverter defaultTypeConverter = new 
SimpleTypeConverter(true, KafkaHeaderDeserializer::convert);
+
+    public void process(@ExchangeProperty("deserializeHeaders") boolean 
deserializeHeaders, Exchange exchange) throws Exception {
+        if (!deserializeHeaders) {
+            return;
+        }
+
+        Map<String, Object> headers = exchange.getMessage().getHeaders();
+
+        TypeConverter typeConverter = exchange.getContext().getTypeConverter();
+        if (typeConverter == null) {
+            typeConverter = defaultTypeConverter;
+        }
+
+        for (Map.Entry<String, Object> header : headers.entrySet()) {
+            if (shouldDeserialize(header)) {
+                header.setValue(typeConverter.convertTo(String.class, 
header.getValue()));
+            }
+        }
+    }
+
+    /**
+     * Fallback conversion strategy supporting null values, String and byte[]. 
Converts headers to respective
+     * String representation or null.
+     * @param type target type, always String in this case.
+     * @param exchange the exchange containing all headers to convert.
+     * @param value the current value to convert.
+     * @return String representation of given value or null if value itself is 
null.
+     */
+    private static Object convert(Class<?> type, Exchange exchange, Object 
value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        if (value instanceof byte[]) {
+            return new String((byte[]) value, StandardCharsets.UTF_8);
+        }
+
+        return value.toString();
+    }
+
+    /**
+     * Exclude special Kafka headers from auto deserialization.
+     * @param entry
+     * @return
+     */
+    private boolean shouldDeserialize(Map.Entry<String, Object> entry) {
+        return !entry.getKey().equals(KafkaConstants.HEADERS) && 
!entry.getKey().equals(KafkaConstants.MANUAL_COMMIT);
+    }
+}
diff --git 
a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java
 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java
new file mode 100644
index 0000000..d5d92f5
--- /dev/null
+++ 
b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.serialization.kafka;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.support.SimpleTypeConverter;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class KafkaHeaderDeserializerTest {
+
+    private DefaultCamelContext camelContext;
+
+    private final KafkaHeaderDeserializer processor = new 
KafkaHeaderDeserializer();
+
+    @BeforeEach
+    void setup() {
+        this.camelContext = new DefaultCamelContext();
+    }
+
+    @Test
+    void shouldDeserializeHeaders() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setHeader("foo", "bar");
+        exchange.getMessage().setHeader("fooBytes", 
"barBytes".getBytes(StandardCharsets.UTF_8));
+        exchange.getMessage().setHeader("fooNull", null);
+        exchange.getMessage().setHeader("number", 1L);
+
+        processor.process(true, exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
+        Assertions.assertEquals("barBytes", 
exchange.getMessage().getHeader("fooBytes"));
+        
Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("fooNull"));
+        Assertions.assertNull(exchange.getMessage().getHeader("fooNull"));
+        Assertions.assertEquals("1", 
exchange.getMessage().getHeader("number"));
+    }
+
+    @Test
+    void shouldDeserializeHeadersViaTypeConverter() throws Exception {
+        camelContext.setTypeConverter(new SimpleTypeConverter(true, (type, 
exchange, value) -> "converted"));
+
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setHeader("foo", "bar");
+        exchange.getMessage().setHeader("fooBytes", 
"barBytes".getBytes(StandardCharsets.UTF_8));
+        exchange.getMessage().setHeader("fooNull", null);
+
+        processor.process(true, exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals("converted", 
exchange.getMessage().getHeader("foo"));
+        Assertions.assertEquals("converted", 
exchange.getMessage().getHeader("fooBytes"));
+        Assertions.assertEquals("converted", 
exchange.getMessage().getHeader("fooNull"));
+    }
+
+    @Test
+    void shouldFallbackToDefaultConverter() throws Exception {
+        camelContext.setTypeConverter(null);
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setHeader("foo", "bar");
+        exchange.getMessage().setHeader("fooBytes", 
"barBytes".getBytes(StandardCharsets.UTF_8));
+
+        processor.process(true, exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
+        Assertions.assertEquals("barBytes", 
exchange.getMessage().getHeader("fooBytes"));
+    }
+
+    @Test
+    void shouldNotDeserializeHeadersWhenDisabled() throws Exception {
+        Exchange exchange = new DefaultExchange(camelContext);
+
+        exchange.getMessage().setHeader("foo", "bar");
+        exchange.getMessage().setHeader("fooBytes", 
"barBytes".getBytes(StandardCharsets.UTF_8));
+
+        processor.process(false, exchange);
+
+        Assertions.assertTrue(exchange.getMessage().hasHeaders());
+        Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
+        Assertions.assertTrue(exchange.getMessage().getHeader("fooBytes") 
instanceof byte[]);
+        
Assertions.assertEquals(Arrays.toString("barBytes".getBytes(StandardCharsets.UTF_8)),
 Arrays.toString((byte[]) exchange.getMessage().getHeader("fooBytes")));
+    }
+}
diff --git 
a/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml 
b/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml
index 82607c9..6bce42c 100644
--- 
a/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml
+++ 
b/library/camel-kamelets/src/main/resources/kamelets/kafka-source.kamelet.yaml
@@ -62,12 +62,12 @@ spec:
         default: SASL_SSL
       saslMechanism:
         title: SASL Mechanism
-        description: The Simple Authentication and Security Layer (SASL) 
Mechanism used. 
+        description: The Simple Authentication and Security Layer (SASL) 
Mechanism used.
         type: string
         default: PLAIN
       user:
         title: Username
-        description: Username to authenticate to Kafka 
+        description: Username to authenticate to Kafka
         type: string
         x-descriptors:
         - urn:camel:group:credentials
@@ -117,7 +117,16 @@ spec:
         x-descriptors:
         - urn:keda:metadata:consumerGroup
         - urn:keda:required
+      deserializeHeaders:
+        title: Automatically Deserialize Headers
+        description: When enabled the Kamelet source will deserialize all 
message headers to String representation.
+        type: boolean
+        x-descriptors:
+          - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+        default: false
   dependencies:
+    - github:apache.camel-kamelets:camel-kamelets-utils:main-SNAPSHOT
+    - "camel:core"
     - "camel:kafka"
     - "camel:kamelet"
   template:
@@ -134,4 +143,10 @@ spec:
         autoOffsetReset: "{{autoOffsetReset}}"
         groupId: "{{?consumerGroup}}"
       steps:
-      - to: "kamelet:sink"
+        - set-property:
+            name: deserializeHeaders
+            constant: "{{deserializeHeaders}}"
+        - bean: 
"org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
+        - remove-property:
+            name: deserializeHeaders
+        - to: "kamelet:sink"
diff --git a/test/kafka/install.sh b/test/kafka/install.sh
new file mode 100644
index 0000000..c5415ba
--- /dev/null
+++ b/test/kafka/install.sh
@@ -0,0 +1,28 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Install Kafka
+kubectl create -f https://strimzi.io/install/latest?namespace=default
+
+# Apply the `Kafka` Cluster CR file
+kubectl apply -f 
https://strimzi.io/examples/latest/kafka/kafka-ephemeral-single.yaml
+
+# wait for everything to start
+kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s
+
+# create default topic
+kubectl apply -f https://strimzi.io/examples/latest/topic/kafka-topic.yaml
diff --git a/test/kafka/kafka-sink-test.yaml b/test/kafka/kafka-sink-test.yaml
new file mode 100644
index 0000000..ffce924
--- /dev/null
+++ b/test/kafka/kafka-sink-test.yaml
@@ -0,0 +1,42 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1alpha1
+kind: KameletBinding
+metadata:
+  name: kafka-sink-test
+spec:
+  source:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: timer-source
+    properties:
+      period: 5000
+      contentType: application/json
+      message: ${message}
+  sink:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: kafka-sink
+    properties:
+      bootstrapServers: 
${bootstrap.server.host}.${YAKS_NAMESPACE}:${bootstrap.server.port}
+      user: ${user}
+      password: ${password}
+      topic: ${topic}
+      securityProtocol: ${securityProtocol}
diff --git a/test/kafka/kafka-sink.feature b/test/kafka/kafka-sink.feature
new file mode 100644
index 0000000..bc106f7
--- /dev/null
+++ b/test/kafka/kafka-sink.feature
@@ -0,0 +1,45 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+Feature: Kafka Kamelet sink
+
+  Background:
+    Given variable user is ""
+    Given variable password is ""
+    Given variables
+      | bootstrap.server.host     | my-cluster-kafka-bootstrap |
+      | bootstrap.server.port     | 9092 |
+      | securityProtocol          | PLAINTEXT |
+      | topic                     | my-topic |
+      | message                   | Camel K rocks! |
+    Given Kafka topic: ${topic}
+    Given Kafka topic partition: 0
+
+  Scenario: Create Kamelet binding
+    Given Camel K resource polling configuration
+      | maxAttempts          | 200   |
+      | delayBetweenAttempts | 2000  |
+    When load KameletBinding kafka-sink-test.yaml
+    Then Camel K integration kafka-sink-test should be running
+
+  Scenario: Receive message on Kafka topic and verify sink output
+    Given Kafka connection
+      | url         | 
${bootstrap.server.host}.${YAKS_NAMESPACE}:${bootstrap.server.port} |
+    Then receive Kafka message with body: ${message}
+
+  Scenario: Remove resources
+    Given delete KameletBinding kafka-sink-test
diff --git a/test/kafka/kafka-source-test.yaml 
b/test/kafka/kafka-source-test.yaml
new file mode 100644
index 0000000..7fa6532
--- /dev/null
+++ b/test/kafka/kafka-source-test.yaml
@@ -0,0 +1,37 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1alpha1
+kind: KameletBinding
+metadata:
+  name: kafka-source-test
+spec:
+  source:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: kafka-source
+    properties:
+      bootstrapServers: 
${bootstrap.server.host}.${YAKS_NAMESPACE}:${bootstrap.server.port}
+      user: ${user}
+      password: ${password}
+      topic: ${topic}
+      securityProtocol: ${securityProtocol}
+      deserializeHeaders: ${deserializeHeaders}
+  sink:
+    uri: http://kafka-to-http-service.${YAKS_NAMESPACE}/result
+
diff --git a/test/kafka/kafka-source.feature b/test/kafka/kafka-source.feature
new file mode 100644
index 0000000..b7759bc
--- /dev/null
+++ b/test/kafka/kafka-source.feature
@@ -0,0 +1,66 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+Feature: Kafka Kamelet source
+
+  Background:
+    Given variable user is ""
+    Given variable password is ""
+    Given variables
+      | bootstrap.server.host     | my-cluster-kafka-bootstrap |
+      | bootstrap.server.port     | 9092 |
+      | securityProtocol          | PLAINTEXT |
+      | deserializeHeaders        | true |
+      | topic                     | my-topic |
+      | source                    | Kafka Kamelet source |
+      | message                   | Camel K rocks! |
+    Given Kafka topic: ${topic}
+    Given Kafka topic partition: 0
+    Given HTTP server timeout is 15000 ms
+    Given HTTP server "kafka-to-http-service"
+
+  Scenario: Create Http server
+    Given create Kubernetes service kafka-to-http-service with target port 8080
+
+  Scenario: Create Kamelet binding
+    Given Camel K resource polling configuration
+      | maxAttempts          | 200   |
+      | delayBetweenAttempts | 2000  |
+    When load KameletBinding kafka-source-test.yaml
+    Then Camel K integration kafka-source-test should be running
+    And Camel K integration kafka-source-test should print Subscribing 
${topic}-Thread 0 to topic ${topic}
+    And sleep 10sec
+
+  Scenario: Send message to Kafka topic and verify sink output
+    Given variable key is "citrus:randomNumber(4)"
+    Given Kafka connection
+      | url         | 
${bootstrap.server.host}.${YAKS_NAMESPACE}:${bootstrap.server.port} |
+    Given Kafka message key: ${key}
+    When send Kafka message with body and headers: ${message}
+      | event-source | ${source} |
+    Then expect HTTP request body: ${message}
+    Then expect HTTP request headers
+      | event-source    | ${source} |
+      | kafka.TOPIC     | ${topic}  |
+      | kafka.KEY       | ${key}    |
+      | kafka.PARTITION | 0         |
+    And receive POST /result
+    And send HTTP 200 OK
+
+  Scenario: Remove resources
+    Given delete KameletBinding kafka-source-test
+    And delete Kubernetes service kafka-to-http-service
diff --git a/test/kafka/uninstall.sh b/test/kafka/uninstall.sh
new file mode 100644
index 0000000..81b3599
--- /dev/null
+++ b/test/kafka/uninstall.sh
@@ -0,0 +1,22 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# delete Kafka cluster
+kubectl delete kafka/my-cluster
+
+# delete default topic
+kubectl delete kafkatopic/my-topic
diff --git a/test/kafka/yaks-config.yaml b/test/kafka/yaks-config.yaml
new file mode 100644
index 0000000..3d1997b
--- /dev/null
+++ b/test/kafka/yaks-config.yaml
@@ -0,0 +1,48 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+config:
+  namespace:
+    temporary: false
+  runtime:
+    env:
+      - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_KUBERNETES_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: CITRUS_TYPE_CONVERTER
+        value: camel
+    resources:
+      - kafka-source-test.yaml
+      - kafka-sink-test.yaml
+  dump:
+    enabled: true
+    failedOnly: true
+    includes:
+      - app=camel-k
+pre:
+  - name: Install Kafka
+    if: env:CI=true
+    script: install.sh
+  - name: Setup Kafka roles
+    if: env:CI=true
+    run: |
+      yaks role --add strimzi
+post:
+  - name: Uninstall Kafka
+    if: env:CI=true
+    script: uninstall.sh

Reply via email to