This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 3b426d1cc22f CAMEL-21438: De-flake timing-sensitive component tests
3b426d1cc22f is described below
commit 3b426d1cc22f5383d19aba71a0e38c07d40889d0
Author: Adriano Machado <[email protected]>
AuthorDate: Tue Jun 30 16:33:41 2026 -0400
CAMEL-21438: De-flake timing-sensitive component tests
Remove timing races in camel-atom, camel-pg-replication-slot, and
camel-elasticsearch-rest-client tests. Atom polling tests now use
repeatCount to bound poll cycles deterministically. Elasticsearch IT
replaces Thread.sleep(5000) with an Awaitility readiness probe against
_cluster/health. PgReplicationSlot IT polls faster to avoid racing
the assertion timeout. Test-only changes, no component behavior changes.
Closes #24338
Co-Authored-By: Claude Opus 4.8 <[email protected]>
---
.../atom/AtomEntryPollingConsumerTest.java | 3 ++-
.../atom/AtomPollingConsumerIdleMessageTest.java | 22 ++++++++++------------
.../component/atom/AtomPollingConsumerTest.java | 6 ++++--
.../component/atom/AtomPollingLowDelayTest.java | 5 +++--
.../component/atom/AtomPollingUnthrottledTest.java | 4 ++--
.../ElasticsearchRestClientComponentIT.java | 10 ++++++++--
.../slot/integration/PgReplicationSlotCamelIT.java | 10 ++++++----
7 files changed, 35 insertions(+), 25 deletions(-)
diff --git
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerTest.java
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerTest.java
index ce868c634b0f..ebf452e3f82b 100644
---
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerTest.java
+++
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerTest.java
@@ -42,7 +42,8 @@ public class AtomEntryPollingConsumerTest extends
CamelTestSupport {
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
-
from("atom:file:src/test/data/feed.atom?splitEntries=true&delay=500")
+ // throttled: one entry per poll; repeatCount=7 bounds it to
exactly the 7 feed entries
+
from("atom:file:src/test/data/feed.atom?splitEntries=true&delay=100&initialDelay=0&repeatCount=7")
.to("mock:result1");
}
};
diff --git
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingConsumerIdleMessageTest.java
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingConsumerIdleMessageTest.java
index 80f9a28a0c6e..3cec836e384e 100644
---
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingConsumerIdleMessageTest.java
+++
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingConsumerIdleMessageTest.java
@@ -16,12 +16,9 @@
*/
package org.apache.camel.component.atom;
-import java.time.Duration;
-
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit6.CamelTestSupport;
-import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
@@ -36,15 +33,16 @@ import static org.junit.jupiter.api.Assertions.assertNull;
public class AtomPollingConsumerIdleMessageTest extends CamelTestSupport {
@Test
- void testConsumeIdleMessages() {
- Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedMinimumMessageCount(2);
- MockEndpoint.assertIsSatisfied(context);
-
- assertNull(mock.getExchanges().get(0).getIn().getBody());
- assertNull(mock.getExchanges().get(1).getIn().getBody());
- });
+ void testConsumeIdleMessages() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ // an empty feed polled with sendEmptyMessageWhenIdle=true emits one
idle exchange per poll;
+ // let assertIsSatisfied do the waiting (default result wait time)
instead of a fixed deadline
+ mock.expectedMinimumMessageCount(2);
+ mock.assertIsSatisfied();
+
+ // idle exchanges carry no body
+ assertNull(mock.getExchanges().get(0).getIn().getBody());
+ assertNull(mock.getExchanges().get(1).getIn().getBody());
}
@Override
diff --git
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingConsumerTest.java
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingConsumerTest.java
index 505e3812fb67..96c1196c8e22 100644
---
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingConsumerTest.java
+++
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingConsumerTest.java
@@ -75,10 +75,12 @@ public class AtomPollingConsumerTest extends
CamelTestSupport {
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
-
from("atom:file:src/test/data/feed.atom?splitEntries=false").to("mock:result");
+ // not split: a single poll delivers the whole feed;
repeatCount=1 keeps it to exactly one message
+
from("atom:file:src/test/data/feed.atom?splitEntries=false&initialDelay=0&repeatCount=1").to("mock:result");
// this is a bit weird syntax that normally is not using the
feedUri parameter
-
from("atom:?feedUri=file:src/test/data/feed.atom&splitEntries=false").to("mock:result2");
+
from("atom:?feedUri=file:src/test/data/feed.atom&splitEntries=false&initialDelay=0&repeatCount=1")
+ .to("mock:result2");
}
};
}
diff --git
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingLowDelayTest.java
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingLowDelayTest.java
index 774c57c05b78..d32458e90c2c 100644
---
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingLowDelayTest.java
+++
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingLowDelayTest.java
@@ -33,7 +33,6 @@ public class AtomPollingLowDelayTest extends CamelTestSupport
{
void testLowDelay() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(7);
- mock.setResultWaitTime(3000L);
mock.assertIsSatisfied();
}
@@ -41,7 +40,9 @@ public class AtomPollingLowDelayTest extends CamelTestSupport
{
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
-
from("atom:file:src/test/data/feed.atom?splitEntries=true&delay=100&initialDelay=0").to("mock:result");
+ // throttled fast polling: one entry per poll; repeatCount=7
bounds it to exactly the 7 feed entries
+
from("atom:file:src/test/data/feed.atom?splitEntries=true&delay=100&initialDelay=0&repeatCount=7")
+ .to("mock:result");
}
};
}
diff --git
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingUnthrottledTest.java
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingUnthrottledTest.java
index 4dd629bf969d..e066b54d05fb 100644
---
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingUnthrottledTest.java
+++
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingUnthrottledTest.java
@@ -30,7 +30,6 @@ public class AtomPollingUnthrottledTest extends
CamelTestSupport {
void testLowDelay() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(7);
- mock.setResultWaitTime(3000L);
MockEndpoint.assertIsSatisfied(context);
}
@@ -39,7 +38,8 @@ public class AtomPollingUnthrottledTest extends
CamelTestSupport {
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
-
from("atom:file:src/test/data/feed.atom?splitEntries=true&throttleEntries=false&initialDelay=0")
+ // unthrottled: a single poll delivers all entries, so
repeatCount=1 yields exactly the 7 feed entries
+
from("atom:file:src/test/data/feed.atom?splitEntries=true&throttleEntries=false&initialDelay=0&repeatCount=1")
.to("mock:result");
}
};
diff --git
a/components/camel-elasticsearch-rest-client/src/test/java/org/apache/camel/component/elasticsearch/rest/client/integration/ElasticsearchRestClientComponentIT.java
b/components/camel-elasticsearch-rest-client/src/test/java/org/apache/camel/component/elasticsearch/rest/client/integration/ElasticsearchRestClientComponentIT.java
index 8abd876d8b9e..6b49b5b14395 100644
---
a/components/camel-elasticsearch-rest-client/src/test/java/org/apache/camel/component/elasticsearch/rest/client/integration/ElasticsearchRestClientComponentIT.java
+++
b/components/camel-elasticsearch-rest-client/src/test/java/org/apache/camel/component/elasticsearch/rest/client/integration/ElasticsearchRestClientComponentIT.java
@@ -28,6 +28,8 @@ import
org.apache.camel.component.elasticsearch.rest.client.ElasticSearchRestCli
import
org.apache.camel.component.elasticsearch.rest.client.ElasticsearchRestClientOperation;
import org.apache.camel.component.mock.MockEndpoint;
import org.awaitility.Awaitility;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -82,8 +84,12 @@ public class ElasticsearchRestClientComponentIT extends
ElasticsearchRestClientI
@Test
void testProducer() throws ExecutionException, InterruptedException {
- // Workaround to avoid the Credential Provider to not be ready and to
receive a 401
- Thread.sleep(5000);
+ // Wait until Elasticsearch security is ready so authenticated
requests succeed, instead of
+ // sleeping a fixed amount and hoping the credential provider is ready
(which races and 401s).
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+ Response health = restClient.performRequest(new Request("GET",
"/_cluster/health"));
+ assertEquals(200, health.getStatusLine().getStatusCode());
+ });
// create index
CompletableFuture<Boolean> ack =
template.asyncRequestBody("direct:create-index", null, Boolean.class);
diff --git
a/components/camel-pg-replication-slot/src/test/java/org/apache/camel/component/pg/replication/slot/integration/PgReplicationSlotCamelIT.java
b/components/camel-pg-replication-slot/src/test/java/org/apache/camel/component/pg/replication/slot/integration/PgReplicationSlotCamelIT.java
index 21bfea73aa5c..c111a64f3cd0 100644
---
a/components/camel-pg-replication-slot/src/test/java/org/apache/camel/component/pg/replication/slot/integration/PgReplicationSlotCamelIT.java
+++
b/components/camel-pg-replication-slot/src/test/java/org/apache/camel/component/pg/replication/slot/integration/PgReplicationSlotCamelIT.java
@@ -59,10 +59,13 @@ public class PgReplicationSlotCamelIT extends
PgReplicationITSupport {
@Override
public void configure() {
+ // poll quickly: this consumer delivers one decoded message
per poll, so the default
+ // 1s initial delay + 500ms cadence can take ~3.5s for 6
messages and race the 5s timeout under CI load
String uriFormat
=
"pg-replication-slot://{{postgres.service.address}}/camel/camel_test_slot:test_decoding?"
+
"user={{postgres.user.name}}&password={{postgres.user.password}}"
- +
"&slotOptions.skip-empty-xacts=true&slotOptions.include-xids=false";
+ +
"&slotOptions.skip-empty-xacts=true&slotOptions.include-xids=false"
+ + "&initialDelay=200&delay=200";
from(uriFormat).to(mockEndpoint);
}
@@ -71,10 +74,9 @@ public class PgReplicationSlotCamelIT extends
PgReplicationITSupport {
@Test
public void canReceiveFromSlot() throws InterruptedException, SQLException
{
- mockEndpoint.expectedMessageCount(1);
-
// test_decoding plugin writes each change in a separate message. Some
other plugins can have different behaviour,
// wal2json default behaviour is to write the whole transaction in one
message.
+ // expectedBodiesReceived pins the exact count (6), order and content.
mockEndpoint.expectedBodiesReceived("BEGIN", "table
public.camel_test_table: INSERT: id[integer]:1984", "COMMIT",
"BEGIN", "table public.camel_test_table: INSERT:
id[integer]:1998", "COMMIT");
@@ -83,6 +85,6 @@ public class PgReplicationSlotCamelIT extends
PgReplicationITSupport {
statement.execute("INSERT INTO camel_test_table(id)
VALUES(1998);");
}
- mockEndpoint.assertIsSatisfied(5000);
+ mockEndpoint.assertIsSatisfied();
}
}