This is an automated email from the ASF dual-hosted git repository.

cdeppisch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git


The following commit(s) were added to refs/heads/main by this push:
     new acfb2ab68 Stabilize integration tests
acfb2ab68 is described below

commit acfb2ab68df950e2dbfb2f98cdd3fd5f78b0ed9a
Author: Christoph Deppisch <[email protected]>
AuthorDate: Mon Sep 8 20:53:16 2025 +0200

    Stabilize integration tests
    
    - Stop Kafka consumers after test
    - Use separate Slack server instances to avoid pending messages
---
 .../src/test/java/KafkaIT.java                     | 29 +++++++++++++++++++++-
 .../src/test/java/SlackIT.java                     | 27 ++++++++++----------
 .../test/resources/kafka/kafka-router-pipe.it.yaml | 12 ++++++++-
 .../test/resources/kafka/kafka-sink-pipe.it.yaml   | 10 +++++++-
 .../test/resources/kafka/kafka-source-pipe.it.yaml | 10 +++++++-
 .../test/resources/slack/application.properties    |  2 +-
 .../test/resources/slack/slack-sink-pipe.it.yaml   |  7 +++---
 .../test/resources/slack/slack-source-pipe.it.yaml | 20 +++++++++------
 8 files changed, 89 insertions(+), 28 deletions(-)

diff --git a/tests/camel-kamelets-itest/src/test/java/KafkaIT.java 
b/tests/camel-kamelets-itest/src/test/java/KafkaIT.java
index c105e8074..dbe3677f5 100644
--- a/tests/camel-kamelets-itest/src/test/java/KafkaIT.java
+++ b/tests/camel-kamelets-itest/src/test/java/KafkaIT.java
@@ -17,9 +17,12 @@
 
 import java.util.stream.Stream;
 
+import org.citrusframework.TestAction;
+import org.citrusframework.common.ShutdownPhase;
 import org.citrusframework.common.TestLoader;
 import org.citrusframework.container.SequenceAfterTest;
 import org.citrusframework.container.SequenceBeforeTest;
+import org.citrusframework.endpoint.Endpoint;
 import org.citrusframework.http.server.HttpServer;
 import org.citrusframework.junit.jupiter.CitrusSupport;
 import org.citrusframework.junit.jupiter.CitrusTestFactory;
@@ -58,10 +61,34 @@ public class KafkaIT {
     @BindToRegistry
     public SequenceAfterTest afterKafka() {
         return new SequenceAfterTest.Builder().onTests("kafka-*").actions(
-                purgeEndpoints().endpoint(kafkaSinkServer)
+                purgeEndpoints().endpoint(kafkaSinkServer),
+                this::stopKafkaConsumers
         ).build();
     }
 
+    @BindToRegistry
+    public TestAction stopKafkaConsumers() {
+        return context -> {
+            try {
+                Endpoint endpoint = 
context.getEndpointFactory().create("kafkaRouterConsumer", context);
+                if (endpoint instanceof ShutdownPhase destroyable) {
+                    destroyable.destroy();
+                }
+            } catch (Exception e) {
+                // ignore
+            }
+
+            try {
+                Endpoint endpoint = 
context.getEndpointFactory().create("kafkaSinkConsumer", context);
+                if (endpoint instanceof ShutdownPhase destroyable) {
+                    destroyable.destroy();
+                }
+            } catch (Exception e) {
+                // ignore
+            }
+        };
+    }
+
     @CitrusTestFactory
     public Stream<DynamicTest> kafka() {
         return 
CitrusTestFactorySupport.factory(TestLoader.YAML).packageScan("kafka");
diff --git a/tests/camel-kamelets-itest/src/test/java/SlackIT.java 
b/tests/camel-kamelets-itest/src/test/java/SlackIT.java
index 9efc372e7..c89ccaf93 100644
--- a/tests/camel-kamelets-itest/src/test/java/SlackIT.java
+++ b/tests/camel-kamelets-itest/src/test/java/SlackIT.java
@@ -18,7 +18,6 @@
 import java.util.stream.Stream;
 
 import org.citrusframework.common.TestLoader;
-import org.citrusframework.container.SequenceAfterTest;
 import org.citrusframework.container.SequenceBeforeTest;
 import org.citrusframework.http.server.HttpServer;
 import org.citrusframework.junit.jupiter.CitrusSupport;
@@ -29,33 +28,35 @@ import org.citrusframework.util.SocketUtils;
 import org.junit.jupiter.api.DynamicTest;
 
 import static 
org.citrusframework.actions.CreateVariablesAction.Builder.createVariables;
-import static 
org.citrusframework.actions.PurgeEndpointAction.Builder.purgeEndpoints;
 import static org.citrusframework.http.endpoint.builder.HttpEndpoints.http;
 
 @CitrusSupport
 public class SlackIT {
 
-    private final int slackServerPort = SocketUtils.findAvailableTcpPort();
+    private final int slackSinkServerPort = SocketUtils.findAvailableTcpPort();
+    private final int slackSourceServerPort = 
SocketUtils.findAvailableTcpPort();
 
     @BindToRegistry
-    HttpServer slackServer = http()
+    HttpServer slackSinkServer = http()
                 .server()
-                .port(slackServerPort)
+                .port(slackSinkServerPort)
                 .timeout(120000L)
                 .autoStart(true)
                 .build();
 
     @BindToRegistry
-    public SequenceBeforeTest beforeSlack() {
-        return new SequenceBeforeTest.Builder().onTests("slack-*").actions(
-                createVariables().variable("slack.server.port", 
String.valueOf(slackServerPort))
-        ).build();
-    }
+    HttpServer slackSourceServer = http()
+                .server()
+                .port(slackSourceServerPort)
+                .timeout(120000L)
+                .autoStart(true)
+                .build();
 
     @BindToRegistry
-    public SequenceAfterTest afterSlack() {
-        return new SequenceAfterTest.Builder().onTests("slack-*").actions(
-                purgeEndpoints().endpoint(slackServer)
+    public SequenceBeforeTest beforeSlack() {
+        return new SequenceBeforeTest.Builder().onTests("slack-*").actions(
+                createVariables().variable("slack.source.server.port", 
String.valueOf(slackSourceServerPort)),
+                createVariables().variable("slack.sink.server.port", 
String.valueOf(slackSinkServerPort))
         ).build();
     }
 
diff --git 
a/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-router-pipe.it.yaml 
b/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-router-pipe.it.yaml
index d3289c0ff..023336a67 100644
--- 
a/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-router-pipe.it.yaml
+++ 
b/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-router-pipe.it.yaml
@@ -65,9 +65,19 @@ actions:
             - name: "message"
               value: "citrus:urlEncode(${kafka.message})"
 
+  # Kafka consumer endpoint
+  - createEndpoint:
+      name: kafkaRouterConsumer
+      type: "kafka"
+      properties:
+        topic: "${topic}"
+        server: "${CITRUS_TESTCONTAINERS_REDPANDA_BOOTSTRAP_SERVERS}"
+        consumerGroup: "consumer-1"
+        offsetReset: "earliest"
+
   # Verify Kafka event
   - receive:
-      endpoint: 
kafka:${topic}?server=${CITRUS_TESTCONTAINERS_REDPANDA_BOOTSTRAP_SERVERS}&consumerGroup=consumer-1&offsetReset=earliest
+      endpoint: kafkaRouterConsumer
       message:
         body:
           data: |
diff --git 
a/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-sink-pipe.it.yaml 
b/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-sink-pipe.it.yaml
index 2c9baa4f3..49f3ab6fd 100644
--- 
a/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-sink-pipe.it.yaml
+++ 
b/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-sink-pipe.it.yaml
@@ -49,9 +49,17 @@ actions:
                 - name: "kafka.message"
                   value: "${kafka.message}"
 
+  # Kafka consumer endpoint
+  - createEndpoint:
+      name: kafkaSinkConsumer
+      type: "kafka"
+      properties:
+        topic: "${kafka.topic}"
+        server: "${CITRUS_TESTCONTAINERS_REDPANDA_BOOTSTRAP_SERVERS}"
+
   # Verify Kafka event
   - receive:
-      endpoint: 
kafka:${kafka.topic}?server=${CITRUS_TESTCONTAINERS_REDPANDA_BOOTSTRAP_SERVERS}
+      endpoint: kafkaSinkConsumer
       message:
         body:
           data: |
diff --git 
a/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-source-pipe.it.yaml 
b/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-source-pipe.it.yaml
index a432d9506..81b05b810 100644
--- 
a/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-source-pipe.it.yaml
+++ 
b/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-source-pipe.it.yaml
@@ -58,9 +58,17 @@ actions:
           integration: "kafka-source-pipe"
           logMessage: "Subscribed to topic(s): ${kafka.topic}"
 
+  # Kafka producer endpoint
+  - createEndpoint:
+      name: kafkaProducer
+      type: "kafka"
+      properties:
+        topic: "${kafka.topic}"
+        server: "${CITRUS_TESTCONTAINERS_REDPANDA_BOOTSTRAP_SERVERS}"
+
   # Push event to Kafka
   - send:
-      endpoint: 
kafka:${kafka.topic}?server=${CITRUS_TESTCONTAINERS_REDPANDA_BOOTSTRAP_SERVERS}
+      endpoint: kafkaProducer
       message:
         headers:
           - name: "event-source"
diff --git 
a/tests/camel-kamelets-itest/src/test/resources/slack/application.properties 
b/tests/camel-kamelets-itest/src/test/resources/slack/application.properties
index 60d706f23..18c38147f 100644
--- a/tests/camel-kamelets-itest/src/test/resources/slack/application.properties
+++ b/tests/camel-kamelets-itest/src/test/resources/slack/application.properties
@@ -15,6 +15,6 @@
 # limitations under the License.
 #
 
-slack.server.url=http://localhost:${slack.server.port}
+slack.server.url=${slack.server.url}
 slack.channel=${slack.channel}
 slack.token=${slack.token}
diff --git 
a/tests/camel-kamelets-itest/src/test/resources/slack/slack-sink-pipe.it.yaml 
b/tests/camel-kamelets-itest/src/test/resources/slack/slack-sink-pipe.it.yaml
index 3053449ca..399b199de 100644
--- 
a/tests/camel-kamelets-itest/src/test/resources/slack/slack-sink-pipe.it.yaml
+++ 
b/tests/camel-kamelets-itest/src/test/resources/slack/slack-sink-pipe.it.yaml
@@ -33,7 +33,8 @@ actions:
   - createVariables:
       variables:
         - name: "slack.server.url"
-          value: "http://localhost:${slack.server.port}";
+          value: "http://localhost:${slack.sink.server.port}";
+
   # Create Camel JBang integration
   - camel:
       jbang:
@@ -51,7 +52,7 @@ actions:
 
   # Verify message post request
   - http:
-      server: "slackServer"
+      server: "slackSinkServer"
       receiveRequest:
         POST:
           path: "/"
@@ -64,7 +65,7 @@ actions:
               }
 
   - http:
-      server: "slackServer"
+      server: "slackSinkServer"
       sendResponse:
         response:
           status: 200
diff --git 
a/tests/camel-kamelets-itest/src/test/resources/slack/slack-source-pipe.it.yaml 
b/tests/camel-kamelets-itest/src/test/resources/slack/slack-source-pipe.it.yaml
index a3a5ce907..3d16aa362 100644
--- 
a/tests/camel-kamelets-itest/src/test/resources/slack/slack-source-pipe.it.yaml
+++ 
b/tests/camel-kamelets-itest/src/test/resources/slack/slack-source-pipe.it.yaml
@@ -31,7 +31,8 @@ actions:
   - createVariables:
       variables:
         - name: "slack.server.url"
-          value: "http://localhost:${slack.server.port}";
+          value: "http://localhost:${slack.source.server.port}";
+
   # Create Camel JBang integration
   - camel:
       jbang:
@@ -45,9 +46,14 @@ actions:
                 - name: "slack.message"
                   value: "${slack.message}"
 
+  # Make sure to remove old messages from server for more stable tests
+  - purge:
+      endpoints:
+        - ref: "slackSourceServer"
+
   # Verify auth test request
   - http:
-      server: "slackServer"
+      server: "slackSourceServer"
       receiveRequest:
         POST:
           path: "/api/auth.test"
@@ -57,7 +63,7 @@ actions:
               value: "Bearer ${slack.token}"
 
   - http:
-      server: "slackServer"
+      server: "slackSourceServer"
       sendResponse:
         response:
           status: 200
@@ -70,7 +76,7 @@ actions:
 
   # Verify conversations list request
   - http:
-      server: "slackServer"
+      server: "slackSourceServer"
       receiveRequest:
         POST:
           path: "/api/conversations.list"
@@ -80,7 +86,7 @@ actions:
               value: "Bearer ${slack.token}"
 
   - http:
-      server: "slackServer"
+      server: "slackSourceServer"
       sendResponse:
         response:
           status: 200
@@ -93,7 +99,7 @@ actions:
 
   # Verify conversations history request
   - http:
-      server: "slackServer"
+      server: "slackSourceServer"
       receiveRequest:
         POST:
           path: "/api/conversations.history"
@@ -103,7 +109,7 @@ actions:
               value: "Bearer ${slack.token}"
 
   - http:
-      server: "slackServer"
+      server: "slackSourceServer"
       sendResponse:
         response:
           status: 200

Reply via email to