This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 621ff9ebf486 CAMEL-22539: Fix flaky unit tests in camel-core
621ff9ebf486 is described below
commit 621ff9ebf486ced750a5c043efd575534c34479e
Author: Guillaume Nodet <[email protected]>
AuthorDate: Wed May 6 08:22:55 2026 +0200
CAMEL-22539: Fix flaky unit tests in camel-core
- FileConsumerIdempotentTest: wait for post-processing before moving file
back
- FileConsumerIdempotentKeyNameAndSizeTest: re-enable on CI
- FileConsumerIncludeExtTest: increase initialDelay to prevent reading
partial files
- MulticastParallelStreamingTimeoutTest: increase timing margins, re-enable
on CI
- MulticastParallelTimeoutStreamCachingTest: replace Thread.sleep with
delay() EIP and Awaitility, re-enable on CI
- ResequenceBatchNotIgnoreInvalidExchangesTest: increase batch timeout
- AggregateExpressionTimeoutPerGroupTest: use
expectedBodiesReceivedInAnyOrder
- AggregateTimeoutOnlyTest: increase completionTimeout
---
.../FileConsumerIdempotentKeyNameAndSizeTest.java | 2 -
.../component/file/FileConsumerIdempotentTest.java | 7 ++-
.../component/file/FileConsumerIncludeExtTest.java | 2 +-
.../MulticastParallelStreamingTimeoutTest.java | 6 +--
.../MulticastParallelTimeoutStreamCachingTest.java | 51 +++++++++++-----------
...sequenceBatchNotIgnoreInvalidExchangesTest.java | 2 +-
.../AggregateExpressionTimeoutPerGroupTest.java | 2 +-
.../aggregator/AggregateTimeoutOnlyTest.java | 10 ++---
8 files changed, 41 insertions(+), 41 deletions(-)
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyNameAndSizeTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyNameAndSizeTest.java
index 6b18f40b325a..8e046a466a82 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyNameAndSizeTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyNameAndSizeTest.java
@@ -20,12 +20,10 @@ import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
/**
* Unit test for the idempotentKey option.
*/
-@DisabledIfSystemProperty(named = "ci.env.name", matches = ".*",
disabledReason = "Flaky on Github CI")
public class FileConsumerIdempotentKeyNameAndSizeTest extends
FileConsumerIdempotentTest {
@Override
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
index 5e254daa591c..64ef838ab257 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
@@ -57,6 +57,10 @@ public class FileConsumerIdempotentTest extends
ContextTestSupport {
oneExchangeDone.matchesWaitTime();
+ // wait for the file to be fully post-processed (moved to done/ and
idempotent key committed)
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() -> Files.exists(testFile("done/report.txt")));
+
// reset mock and set new expectations
mock.reset();
mock.expectedMessageCount(0);
@@ -66,7 +70,8 @@ public class FileConsumerIdempotentTest extends
ContextTestSupport {
// should NOT consume the file again, let a bit time pass to let the
// consumer try to consume it but it should not
- Awaitility.await().pollDelay(100,
TimeUnit.MILLISECONDS).untilAsserted(() -> assertMockEndpointsSatisfied());
+ Awaitility.await().pollDelay(1, TimeUnit.SECONDS).atMost(2,
TimeUnit.SECONDS)
+ .untilAsserted(() -> assertMockEndpointsSatisfied());
FileEndpoint fe = context.getEndpoint(fileUri(), FileEndpoint.class);
assertNotNull(fe);
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIncludeExtTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIncludeExtTest.java
index 5f99a6007512..1bb0106f56f7 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIncludeExtTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIncludeExtTest.java
@@ -41,7 +41,7 @@ public class FileConsumerIncludeExtTest extends
ContextTestSupport {
return new RouteBuilder() {
@Override
public void configure() {
- from(fileUri("?initialDelay=0&delay=10&includeExt=txt,dat"))
+ from(fileUri("?initialDelay=250&delay=10&includeExt=txt,dat"))
.to("mock:txt");
}
};
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTimeoutTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTimeoutTest.java
index 55580ee04039..ba4a4e23a5cc 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTimeoutTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTimeoutTest.java
@@ -22,10 +22,8 @@ import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.DisabledOnOs;
-@DisabledIfSystemProperty(named = "ci.env.name", matches = ".*",
disabledReason = "Flaky on Github CI")
@DisabledOnOs(architectures = { "s390x" },
disabledReason = "This test does not run reliably on s390x (see
CAMEL-21438)")
public class MulticastParallelStreamingTimeoutTest extends ContextTestSupport {
@@ -56,11 +54,11 @@ public class MulticastParallelStreamingTimeoutTest extends
ContextTestSupport {
oldExchange.getIn().setBody(body +
newExchange.getIn().getBody(String.class));
return oldExchange;
}
-
}).parallelProcessing().streaming().timeout(2000).to("direct:a", "direct:b",
"direct:c")
+
}).parallelProcessing().streaming().timeout(5000).to("direct:a", "direct:b",
"direct:c")
// use end to indicate end of multicast route
.end().to("mock:result");
- from("direct:a").delay(3000).setBody(constant("A"));
+ from("direct:a").delay(10000).setBody(constant("A"));
from("direct:b").delay(500).setBody(constant("B"));
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
index 9f38d47a36d3..f579b9f2bb22 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
@@ -28,8 +29,8 @@ import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.converter.stream.CachedOutputStream;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.DisabledOnOs;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -39,7 +40,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/**
*
*/
-@DisabledIfSystemProperty(named = "ci.env.name", matches = ".*",
disabledReason = "Unreliable on virtual machines")
@DisabledOnOs(architectures = { "s390x" },
disabledReason = "This test does not run reliably on s390x")
public class MulticastParallelTimeoutStreamCachingTest extends
ContextTestSupport {
@@ -56,10 +56,12 @@ public class MulticastParallelTimeoutStreamCachingTest
extends ContextTestSuppor
File f = testDirectory().toFile();
assertTrue(f.isDirectory());
- Thread.sleep(500L); // deletion happens asynchron
- File[] files = f.listFiles();
- assertNotNull(files, "There should be a list of files");
- assertEquals(0, files.length);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> {
+ File[] files = f.listFiles();
+ assertNotNull(files, "There should be a list of files");
+ assertEquals(0, files.length);
+ });
}
@Test
@@ -73,14 +75,8 @@ public class MulticastParallelTimeoutStreamCachingTest
extends ContextTestSuppor
@Override
protected RouteBuilder createRouteBuilder() {
- final Processor processor1 = new Processor() {
+ final Processor setStreamBody = new Processor() {
public void process(Exchange exchange) {
- try {
- // sleep for one second so that the stream cache is built
after the main exchange has finished due to timeout on the multicast
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- throw new IllegalStateException("Unexpected exception", e);
- }
Message in = exchange.getIn();
// use FilterInputStream to trigger streamcaching
in.setBody(new FilterInputStream(new
ByteArrayInputStream(BODY)) {
@@ -89,16 +85,16 @@ public class MulticastParallelTimeoutStreamCachingTest
extends ContextTestSuppor
}
};
- final Processor processor2 = new Processor() {
- public void process(Exchange exchange) throws IOException {
- // create first the OutputStreamCache and then sleep
+ final Processor createOutputStream = new Processor() {
+ public void process(Exchange exchange) {
CachedOutputStream outputStream = new
CachedOutputStream(exchange);
- try {
- // sleep for one second so that the write to the
CachedOutputStream happens after the main exchange has finished due to timeout
on the multicast
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- throw new IllegalStateException("Unexpected exception", e);
- }
+ exchange.setProperty("cachedOutputStream", outputStream);
+ }
+ };
+
+ final Processor writeOutputStream = new Processor() {
+ public void process(Exchange exchange) throws IOException {
+ CachedOutputStream outputStream =
exchange.getProperty("cachedOutputStream", CachedOutputStream.class);
outputStream.write(BODY);
Message in = exchange.getIn();
// use FilterInputStream to trigger streamcaching
@@ -118,13 +114,16 @@ public class MulticastParallelTimeoutStreamCachingTest
extends ContextTestSuppor
onException(IOException.class).to("mock:exception");
-
from("direct:a").multicast().timeout(500L).parallelProcessing().to("direct:x");
+
from("direct:a").multicast().timeout(2000).parallelProcessing().to("direct:x");
- from("direct:x").process(processor1).to("mock:x");
+ // delay so the stream cache is built after the main exchange
has finished due to timeout
+
from("direct:x").delay(5000).process(setStreamBody).to("mock:x");
-
from("direct:b").multicast().timeout(500L).parallelProcessing().to("direct:y");
+
from("direct:b").multicast().timeout(2000).parallelProcessing().to("direct:y");
- from("direct:y").process(processor2).to("mock:y");
+ // create the CachedOutputStream before the delay, then write
to it after
+ // the delay (which is after the multicast timeout), which
should cause an IOException
+
from("direct:y").process(createOutputStream).delay(5000).process(writeOutputStream).to("mock:y");
}
};
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java
index 8766d048c486..4a7be2a06dc4 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java
@@ -30,7 +30,7 @@ public class ResequenceBatchNotIgnoreInvalidExchangesTest
extends ResequenceStre
return new RouteBuilder() {
@Override
public void configure() {
-
from("direct:start").resequence(header("seqno")).batch().timeout(150).to("mock:result");
+
from("direct:start").resequence(header("seqno")).batch().timeout(1000).to("mock:result");
}
};
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutPerGroupTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutPerGroupTest.java
index 6a344ec4ab29..4bc5b21a2074 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutPerGroupTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutPerGroupTest.java
@@ -28,7 +28,7 @@ public class AggregateExpressionTimeoutPerGroupTest extends
ContextTestSupport {
@Test
public void testAggregateExpressionPerGroupTimeout() throws Exception {
- getMockEndpoint("mock:aggregated").expectedBodiesReceived("G+H+I",
"D+E+F", "A+B+C");
+
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("G+H+I",
"D+E+F", "A+B+C");
// will use fallback timeout (1 sec)
template.sendBodyAndHeader("direct:start", "A", "id", 789);
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java
index 31682539f94c..a5523a327238 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java
@@ -33,8 +33,8 @@ public class AggregateTimeoutOnlyTest extends
ContextTestSupport {
// by default the use latest aggregation strategy is used so we get
// message 9
result.expectedBodiesReceived("Message 9");
- // should take 0.1 seconds to complete this one
- result.setResultMinimumWaitTime(90);
+ // should take about 1 second to complete this one
+ result.setResultMinimumWaitTime(900);
for (int i = 0; i < 10; i++) {
template.sendBodyAndHeader("direct:start", "Message " + i, "id",
"1");
@@ -50,9 +50,9 @@ public class AggregateTimeoutOnlyTest extends
ContextTestSupport {
public void configure() {
// START SNIPPET: e1
from("direct:start")
- // aggregate timeout after 0.1 second
- .aggregate(header("id"), new
UseLatestAggregationStrategy()).completionTimeout(100)
-
.completionTimeoutCheckerInterval(10).to("mock:result");
+ // aggregate timeout after 1 second
+ .aggregate(header("id"), new
UseLatestAggregationStrategy()).completionTimeout(1000)
+
.completionTimeoutCheckerInterval(50).to("mock:result");
// END SNIPPET: e1
}
};