This is an automated email from the ASF dual-hosted git repository.
cdeppisch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git
The following commit(s) were added to refs/heads/main by this push:
new acfb2ab68 Stabilize integration tests
acfb2ab68 is described below
commit acfb2ab68df950e2dbfb2f98cdd3fd5f78b0ed9a
Author: Christoph Deppisch <[email protected]>
AuthorDate: Mon Sep 8 20:53:16 2025 +0200
Stabilize integration tests
- Stop Kafka consumers after test
- Use separate Slack server instances to avoid pending messages
---
.../src/test/java/KafkaIT.java | 29 +++++++++++++++++++++-
.../src/test/java/SlackIT.java | 27 ++++++++++----------
.../test/resources/kafka/kafka-router-pipe.it.yaml | 12 ++++++++-
.../test/resources/kafka/kafka-sink-pipe.it.yaml | 10 +++++++-
.../test/resources/kafka/kafka-source-pipe.it.yaml | 10 +++++++-
.../test/resources/slack/application.properties | 2 +-
.../test/resources/slack/slack-sink-pipe.it.yaml | 7 +++---
.../test/resources/slack/slack-source-pipe.it.yaml | 20 +++++++++------
8 files changed, 89 insertions(+), 28 deletions(-)
diff --git a/tests/camel-kamelets-itest/src/test/java/KafkaIT.java
b/tests/camel-kamelets-itest/src/test/java/KafkaIT.java
index c105e8074..dbe3677f5 100644
--- a/tests/camel-kamelets-itest/src/test/java/KafkaIT.java
+++ b/tests/camel-kamelets-itest/src/test/java/KafkaIT.java
@@ -17,9 +17,12 @@
import java.util.stream.Stream;
+import org.citrusframework.TestAction;
+import org.citrusframework.common.ShutdownPhase;
import org.citrusframework.common.TestLoader;
import org.citrusframework.container.SequenceAfterTest;
import org.citrusframework.container.SequenceBeforeTest;
+import org.citrusframework.endpoint.Endpoint;
import org.citrusframework.http.server.HttpServer;
import org.citrusframework.junit.jupiter.CitrusSupport;
import org.citrusframework.junit.jupiter.CitrusTestFactory;
@@ -58,10 +61,34 @@ public class KafkaIT {
@BindToRegistry
public SequenceAfterTest afterKafka() {
return new SequenceAfterTest.Builder().onTests("kafka-*").actions(
- purgeEndpoints().endpoint(kafkaSinkServer)
+ purgeEndpoints().endpoint(kafkaSinkServer),
+ this::stopKafkaConsumers
).build();
}
+ @BindToRegistry
+ public TestAction stopKafkaConsumers() {
+ return context -> {
+ try {
+ Endpoint endpoint =
context.getEndpointFactory().create("kafkaRouterConsumer", context);
+ if (endpoint instanceof ShutdownPhase destroyable) {
+ destroyable.destroy();
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+
+ try {
+ Endpoint endpoint =
context.getEndpointFactory().create("kafkaSinkConsumer", context);
+ if (endpoint instanceof ShutdownPhase destroyable) {
+ destroyable.destroy();
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+ };
+ }
+
@CitrusTestFactory
public Stream<DynamicTest> kafka() {
return
CitrusTestFactorySupport.factory(TestLoader.YAML).packageScan("kafka");
diff --git a/tests/camel-kamelets-itest/src/test/java/SlackIT.java
b/tests/camel-kamelets-itest/src/test/java/SlackIT.java
index 9efc372e7..c89ccaf93 100644
--- a/tests/camel-kamelets-itest/src/test/java/SlackIT.java
+++ b/tests/camel-kamelets-itest/src/test/java/SlackIT.java
@@ -18,7 +18,6 @@
import java.util.stream.Stream;
import org.citrusframework.common.TestLoader;
-import org.citrusframework.container.SequenceAfterTest;
import org.citrusframework.container.SequenceBeforeTest;
import org.citrusframework.http.server.HttpServer;
import org.citrusframework.junit.jupiter.CitrusSupport;
@@ -29,33 +28,35 @@ import org.citrusframework.util.SocketUtils;
import org.junit.jupiter.api.DynamicTest;
import static
org.citrusframework.actions.CreateVariablesAction.Builder.createVariables;
-import static
org.citrusframework.actions.PurgeEndpointAction.Builder.purgeEndpoints;
import static org.citrusframework.http.endpoint.builder.HttpEndpoints.http;
@CitrusSupport
public class SlackIT {
- private final int slackServerPort = SocketUtils.findAvailableTcpPort();
+ private final int slackSinkServerPort = SocketUtils.findAvailableTcpPort();
+ private final int slackSourceServerPort =
SocketUtils.findAvailableTcpPort();
@BindToRegistry
- HttpServer slackServer = http()
+ HttpServer slackSinkServer = http()
.server()
- .port(slackServerPort)
+ .port(slackSinkServerPort)
.timeout(120000L)
.autoStart(true)
.build();
@BindToRegistry
- public SequenceBeforeTest beforeSlack() {
- return new SequenceBeforeTest.Builder().onTests("slack-*").actions(
- createVariables().variable("slack.server.port",
String.valueOf(slackServerPort))
- ).build();
- }
+ HttpServer slackSourceServer = http()
+ .server()
+ .port(slackSourceServerPort)
+ .timeout(120000L)
+ .autoStart(true)
+ .build();
@BindToRegistry
- public SequenceAfterTest afterSlack() {
- return new SequenceAfterTest.Builder().onTests("slack-*").actions(
- purgeEndpoints().endpoint(slackServer)
+ public SequenceBeforeTest beforeSlack() {
+ return new SequenceBeforeTest.Builder().onTests("slack-*").actions(
+ createVariables().variable("slack.source.server.port",
String.valueOf(slackSourceServerPort)),
+ createVariables().variable("slack.sink.server.port",
String.valueOf(slackSinkServerPort))
).build();
}
diff --git
a/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-router-pipe.it.yaml
b/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-router-pipe.it.yaml
index d3289c0ff..023336a67 100644
---
a/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-router-pipe.it.yaml
+++
b/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-router-pipe.it.yaml
@@ -65,9 +65,19 @@ actions:
- name: "message"
value: "citrus:urlEncode(${kafka.message})"
+ # Kafka consumer endpoint
+ - createEndpoint:
+ name: kafkaRouterConsumer
+ type: "kafka"
+ properties:
+ topic: "${topic}"
+ server: "${CITRUS_TESTCONTAINERS_REDPANDA_BOOTSTRAP_SERVERS}"
+ consumerGroup: "consumer-1"
+ offsetReset: "earliest"
+
# Verify Kafka event
- receive:
- endpoint:
kafka:${topic}?server=${CITRUS_TESTCONTAINERS_REDPANDA_BOOTSTRAP_SERVERS}&consumerGroup=consumer-1&offsetReset=earliest
+ endpoint: kafkaRouterConsumer
message:
body:
data: |
diff --git
a/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-sink-pipe.it.yaml
b/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-sink-pipe.it.yaml
index 2c9baa4f3..49f3ab6fd 100644
---
a/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-sink-pipe.it.yaml
+++
b/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-sink-pipe.it.yaml
@@ -49,9 +49,17 @@ actions:
- name: "kafka.message"
value: "${kafka.message}"
+ # Kafka consumer endpoint
+ - createEndpoint:
+ name: kafkaSinkConsumer
+ type: "kafka"
+ properties:
+ topic: "${kafka.topic}"
+ server: "${CITRUS_TESTCONTAINERS_REDPANDA_BOOTSTRAP_SERVERS}"
+
# Verify Kafka event
- receive:
- endpoint:
kafka:${kafka.topic}?server=${CITRUS_TESTCONTAINERS_REDPANDA_BOOTSTRAP_SERVERS}
+ endpoint: kafkaSinkConsumer
message:
body:
data: |
diff --git
a/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-source-pipe.it.yaml
b/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-source-pipe.it.yaml
index a432d9506..81b05b810 100644
---
a/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-source-pipe.it.yaml
+++
b/tests/camel-kamelets-itest/src/test/resources/kafka/kafka-source-pipe.it.yaml
@@ -58,9 +58,17 @@ actions:
integration: "kafka-source-pipe"
logMessage: "Subscribed to topic(s): ${kafka.topic}"
+ # Kafka producer endpoint
+ - createEndpoint:
+ name: kafkaProducer
+ type: "kafka"
+ properties:
+ topic: "${kafka.topic}"
+ server: "${CITRUS_TESTCONTAINERS_REDPANDA_BOOTSTRAP_SERVERS}"
+
# Push event to Kafka
- send:
- endpoint:
kafka:${kafka.topic}?server=${CITRUS_TESTCONTAINERS_REDPANDA_BOOTSTRAP_SERVERS}
+ endpoint: kafkaProducer
message:
headers:
- name: "event-source"
diff --git
a/tests/camel-kamelets-itest/src/test/resources/slack/application.properties
b/tests/camel-kamelets-itest/src/test/resources/slack/application.properties
index 60d706f23..18c38147f 100644
--- a/tests/camel-kamelets-itest/src/test/resources/slack/application.properties
+++ b/tests/camel-kamelets-itest/src/test/resources/slack/application.properties
@@ -15,6 +15,6 @@
# limitations under the License.
#
-slack.server.url=http://localhost:${slack.server.port}
+slack.server.url=${slack.server.url}
slack.channel=${slack.channel}
slack.token=${slack.token}
diff --git
a/tests/camel-kamelets-itest/src/test/resources/slack/slack-sink-pipe.it.yaml
b/tests/camel-kamelets-itest/src/test/resources/slack/slack-sink-pipe.it.yaml
index 3053449ca..399b199de 100644
---
a/tests/camel-kamelets-itest/src/test/resources/slack/slack-sink-pipe.it.yaml
+++
b/tests/camel-kamelets-itest/src/test/resources/slack/slack-sink-pipe.it.yaml
@@ -33,7 +33,8 @@ actions:
- createVariables:
variables:
- name: "slack.server.url"
- value: "http://localhost:${slack.server.port}"
+ value: "http://localhost:${slack.sink.server.port}"
+
# Create Camel JBang integration
- camel:
jbang:
@@ -51,7 +52,7 @@ actions:
# Verify message post request
- http:
- server: "slackServer"
+ server: "slackSinkServer"
receiveRequest:
POST:
path: "/"
@@ -64,7 +65,7 @@ actions:
}
- http:
- server: "slackServer"
+ server: "slackSinkServer"
sendResponse:
response:
status: 200
diff --git
a/tests/camel-kamelets-itest/src/test/resources/slack/slack-source-pipe.it.yaml
b/tests/camel-kamelets-itest/src/test/resources/slack/slack-source-pipe.it.yaml
index a3a5ce907..3d16aa362 100644
---
a/tests/camel-kamelets-itest/src/test/resources/slack/slack-source-pipe.it.yaml
+++
b/tests/camel-kamelets-itest/src/test/resources/slack/slack-source-pipe.it.yaml
@@ -31,7 +31,8 @@ actions:
- createVariables:
variables:
- name: "slack.server.url"
- value: "http://localhost:${slack.server.port}"
+ value: "http://localhost:${slack.source.server.port}"
+
# Create Camel JBang integration
- camel:
jbang:
@@ -45,9 +46,14 @@ actions:
- name: "slack.message"
value: "${slack.message}"
+ # Make sure to remove old messages from server for more stable tests
+ - purge:
+ endpoints:
+ - ref: "slackSourceServer"
+
# Verify auth test request
- http:
- server: "slackServer"
+ server: "slackSourceServer"
receiveRequest:
POST:
path: "/api/auth.test"
@@ -57,7 +63,7 @@ actions:
value: "Bearer ${slack.token}"
- http:
- server: "slackServer"
+ server: "slackSourceServer"
sendResponse:
response:
status: 200
@@ -70,7 +76,7 @@ actions:
# Verify conversations list request
- http:
- server: "slackServer"
+ server: "slackSourceServer"
receiveRequest:
POST:
path: "/api/conversations.list"
@@ -80,7 +86,7 @@ actions:
value: "Bearer ${slack.token}"
- http:
- server: "slackServer"
+ server: "slackSourceServer"
sendResponse:
response:
status: 200
@@ -93,7 +99,7 @@ actions:
# Verify conversations history request
- http:
- server: "slackServer"
+ server: "slackSourceServer"
receiveRequest:
POST:
path: "/api/conversations.history"
@@ -103,7 +109,7 @@ actions:
value: "Bearer ${slack.token}"
- http:
- server: "slackServer"
+ server: "slackSourceServer"
sendResponse:
response:
status: 200