This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 621ff9ebf486 CAMEL-22539: Fix flaky unit tests in camel-core
621ff9ebf486 is described below

commit 621ff9ebf486ced750a5c043efd575534c34479e
Author: Guillaume Nodet <[email protected]>
AuthorDate: Wed May 6 08:22:55 2026 +0200

    CAMEL-22539: Fix flaky unit tests in camel-core
    
    - FileConsumerIdempotentTest: wait for post-processing before moving file 
back
    - FileConsumerIdempotentKeyNameAndSizeTest: re-enable on CI
    - FileConsumerIncludeExtTest: increase initialDelay to prevent reading 
partial files
    - MulticastParallelStreamingTimeoutTest: increase timing margins, re-enable 
on CI
    - MulticastParallelTimeoutStreamCachingTest: replace Thread.sleep with 
delay() EIP and Awaitility, re-enable on CI
    - ResequenceBatchNotIgnoreInvalidExchangesTest: increase batch timeout
    - AggregateExpressionTimeoutPerGroupTest: use 
expectedBodiesReceivedInAnyOrder
    - AggregateTimeoutOnlyTest: increase completionTimeout
---
 .../FileConsumerIdempotentKeyNameAndSizeTest.java  |  2 -
 .../component/file/FileConsumerIdempotentTest.java |  7 ++-
 .../component/file/FileConsumerIncludeExtTest.java |  2 +-
 .../MulticastParallelStreamingTimeoutTest.java     |  6 +--
 .../MulticastParallelTimeoutStreamCachingTest.java | 51 +++++++++++-----------
 ...sequenceBatchNotIgnoreInvalidExchangesTest.java |  2 +-
 .../AggregateExpressionTimeoutPerGroupTest.java    |  2 +-
 .../aggregator/AggregateTimeoutOnlyTest.java       | 10 ++---
 8 files changed, 41 insertions(+), 41 deletions(-)

diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyNameAndSizeTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyNameAndSizeTest.java
index 6b18f40b325a..8e046a466a82 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyNameAndSizeTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyNameAndSizeTest.java
@@ -20,12 +20,10 @@ import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 
 /**
  * Unit test for the idempotentKey option.
  */
-@DisabledIfSystemProperty(named = "ci.env.name", matches = ".*", 
disabledReason = "Flaky on Github CI")
 public class FileConsumerIdempotentKeyNameAndSizeTest extends 
FileConsumerIdempotentTest {
 
     @Override
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
index 5e254daa591c..64ef838ab257 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
@@ -57,6 +57,10 @@ public class FileConsumerIdempotentTest extends 
ContextTestSupport {
 
         oneExchangeDone.matchesWaitTime();
 
+        // wait for the file to be fully post-processed (moved to done/ and 
idempotent key committed)
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> Files.exists(testFile("done/report.txt")));
+
         // reset mock and set new expectations
         mock.reset();
         mock.expectedMessageCount(0);
@@ -66,7 +70,8 @@ public class FileConsumerIdempotentTest extends 
ContextTestSupport {
 
         // should NOT consume the file again, let a bit time pass to let the
         // consumer try to consume it but it should not
-        Awaitility.await().pollDelay(100, 
TimeUnit.MILLISECONDS).untilAsserted(() -> assertMockEndpointsSatisfied());
+        Awaitility.await().pollDelay(1, TimeUnit.SECONDS).atMost(2, 
TimeUnit.SECONDS)
+                .untilAsserted(() -> assertMockEndpointsSatisfied());
 
         FileEndpoint fe = context.getEndpoint(fileUri(), FileEndpoint.class);
         assertNotNull(fe);
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIncludeExtTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIncludeExtTest.java
index 5f99a6007512..1bb0106f56f7 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIncludeExtTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIncludeExtTest.java
@@ -41,7 +41,7 @@ public class FileConsumerIncludeExtTest extends 
ContextTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() {
-                from(fileUri("?initialDelay=0&delay=10&includeExt=txt,dat"))
+                from(fileUri("?initialDelay=250&delay=10&includeExt=txt,dat"))
                         .to("mock:txt");
             }
         };
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTimeoutTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTimeoutTest.java
index 55580ee04039..ba4a4e23a5cc 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTimeoutTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTimeoutTest.java
@@ -22,10 +22,8 @@ import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 
-@DisabledIfSystemProperty(named = "ci.env.name", matches = ".*", 
disabledReason = "Flaky on Github CI")
 @DisabledOnOs(architectures = { "s390x" },
               disabledReason = "This test does not run reliably on s390x (see 
CAMEL-21438)")
 public class MulticastParallelStreamingTimeoutTest extends ContextTestSupport {
@@ -56,11 +54,11 @@ public class MulticastParallelStreamingTimeoutTest extends 
ContextTestSupport {
                         oldExchange.getIn().setBody(body + 
newExchange.getIn().getBody(String.class));
                         return oldExchange;
                     }
-                
}).parallelProcessing().streaming().timeout(2000).to("direct:a", "direct:b", 
"direct:c")
+                
}).parallelProcessing().streaming().timeout(5000).to("direct:a", "direct:b", 
"direct:c")
                         // use end to indicate end of multicast route
                         .end().to("mock:result");
 
-                from("direct:a").delay(3000).setBody(constant("A"));
+                from("direct:a").delay(10000).setBody(constant("A"));
 
                 from("direct:b").delay(500).setBody(constant("B"));
 
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
index 9f38d47a36d3..f579b9f2bb22 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
@@ -28,8 +29,8 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.converter.stream.CachedOutputStream;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -39,7 +40,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /**
  *
  */
-@DisabledIfSystemProperty(named = "ci.env.name", matches = ".*", 
disabledReason = "Unreliable on virtual machines")
 @DisabledOnOs(architectures = { "s390x" },
               disabledReason = "This test does not run reliably on s390x")
 public class MulticastParallelTimeoutStreamCachingTest extends 
ContextTestSupport {
@@ -56,10 +56,12 @@ public class MulticastParallelTimeoutStreamCachingTest 
extends ContextTestSuppor
 
         File f = testDirectory().toFile();
         assertTrue(f.isDirectory());
-        Thread.sleep(500L); // deletion happens asynchron
-        File[] files = f.listFiles();
-        assertNotNull(files, "There should be a list of files");
-        assertEquals(0, files.length);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> {
+                    File[] files = f.listFiles();
+                    assertNotNull(files, "There should be a list of files");
+                    assertEquals(0, files.length);
+                });
     }
 
     @Test
@@ -73,14 +75,8 @@ public class MulticastParallelTimeoutStreamCachingTest 
extends ContextTestSuppor
 
     @Override
     protected RouteBuilder createRouteBuilder() {
-        final Processor processor1 = new Processor() {
+        final Processor setStreamBody = new Processor() {
             public void process(Exchange exchange) {
-                try {
-                    // sleep for one second so that the stream cache is built 
after the main exchange has finished due to timeout on the multicast
-                    Thread.sleep(1000L);
-                } catch (InterruptedException e) {
-                    throw new IllegalStateException("Unexpected exception", e);
-                }
                 Message in = exchange.getIn();
                 // use FilterInputStream to trigger streamcaching
                 in.setBody(new FilterInputStream(new 
ByteArrayInputStream(BODY)) {
@@ -89,16 +85,16 @@ public class MulticastParallelTimeoutStreamCachingTest 
extends ContextTestSuppor
             }
         };
 
-        final Processor processor2 = new Processor() {
-            public void process(Exchange exchange) throws IOException {
-                // create first the OutputStreamCache and then sleep
+        final Processor createOutputStream = new Processor() {
+            public void process(Exchange exchange) {
                 CachedOutputStream outputStream = new 
CachedOutputStream(exchange);
-                try {
-                    // sleep for one second so that the write to the 
CachedOutputStream happens after the main exchange has finished due to timeout 
on the multicast
-                    Thread.sleep(1000L);
-                } catch (InterruptedException e) {
-                    throw new IllegalStateException("Unexpected exception", e);
-                }
+                exchange.setProperty("cachedOutputStream", outputStream);
+            }
+        };
+
+        final Processor writeOutputStream = new Processor() {
+            public void process(Exchange exchange) throws IOException {
+                CachedOutputStream outputStream = 
exchange.getProperty("cachedOutputStream", CachedOutputStream.class);
                 outputStream.write(BODY);
                 Message in = exchange.getIn();
                 // use FilterInputStream to trigger streamcaching
@@ -118,13 +114,16 @@ public class MulticastParallelTimeoutStreamCachingTest 
extends ContextTestSuppor
 
                 onException(IOException.class).to("mock:exception");
 
-                
from("direct:a").multicast().timeout(500L).parallelProcessing().to("direct:x");
+                
from("direct:a").multicast().timeout(2000).parallelProcessing().to("direct:x");
 
-                from("direct:x").process(processor1).to("mock:x");
+                // delay so the stream cache is built after the main exchange 
has finished due to timeout
+                
from("direct:x").delay(5000).process(setStreamBody).to("mock:x");
 
-                
from("direct:b").multicast().timeout(500L).parallelProcessing().to("direct:y");
+                
from("direct:b").multicast().timeout(2000).parallelProcessing().to("direct:y");
 
-                from("direct:y").process(processor2).to("mock:y");
+                // create the CachedOutputStream before the delay, then write 
to it after
+                // the delay (which is after the multicast timeout), which 
should cause an IOException
+                
from("direct:y").process(createOutputStream).delay(5000).process(writeOutputStream).to("mock:y");
             }
         };
     }
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java
index 8766d048c486..4a7be2a06dc4 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java
@@ -30,7 +30,7 @@ public class ResequenceBatchNotIgnoreInvalidExchangesTest 
extends ResequenceStre
         return new RouteBuilder() {
             @Override
             public void configure() {
-                
from("direct:start").resequence(header("seqno")).batch().timeout(150).to("mock:result");
+                
from("direct:start").resequence(header("seqno")).batch().timeout(1000).to("mock:result");
             }
         };
     }
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutPerGroupTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutPerGroupTest.java
index 6a344ec4ab29..4bc5b21a2074 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutPerGroupTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutPerGroupTest.java
@@ -28,7 +28,7 @@ public class AggregateExpressionTimeoutPerGroupTest extends 
ContextTestSupport {
 
     @Test
     public void testAggregateExpressionPerGroupTimeout() throws Exception {
-        getMockEndpoint("mock:aggregated").expectedBodiesReceived("G+H+I", 
"D+E+F", "A+B+C");
+        
getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("G+H+I", 
"D+E+F", "A+B+C");
 
         // will use fallback timeout (1 sec)
         template.sendBodyAndHeader("direct:start", "A", "id", 789);
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java
index 31682539f94c..a5523a327238 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java
@@ -33,8 +33,8 @@ public class AggregateTimeoutOnlyTest extends 
ContextTestSupport {
         // by default the use latest aggregation strategy is used so we get
         // message 9
         result.expectedBodiesReceived("Message 9");
-        // should take 0.1 seconds to complete this one
-        result.setResultMinimumWaitTime(90);
+        // should take about 1 second to complete this one
+        result.setResultMinimumWaitTime(900);
 
         for (int i = 0; i < 10; i++) {
             template.sendBodyAndHeader("direct:start", "Message " + i, "id", 
"1");
@@ -50,9 +50,9 @@ public class AggregateTimeoutOnlyTest extends 
ContextTestSupport {
             public void configure() {
                 // START SNIPPET: e1
                 from("direct:start")
-                        // aggregate timeout after 0.1 second
-                        .aggregate(header("id"), new 
UseLatestAggregationStrategy()).completionTimeout(100)
-                        
.completionTimeoutCheckerInterval(10).to("mock:result");
+                        // aggregate timeout after 1 second
+                        .aggregate(header("id"), new 
UseLatestAggregationStrategy()).completionTimeout(1000)
+                        
.completionTimeoutCheckerInterval(50).to("mock:result");
                 // END SNIPPET: e1
             }
         };

Reply via email to