This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 06bfff6de72c CAMEL-22524: Add HazelcastRoutePolicy integration test
06bfff6de72c is described below

commit 06bfff6de72c1b58eb5588b6245a8d7216131ca2
Author: Guillaume Nodet <[email protected]>
AuthorDate: Wed Apr 8 23:56:19 2026 +0200

    CAMEL-22524: Add HazelcastRoutePolicy integration test
    
    Automate the manual HazelcastRoutePolicyMain test as a proper JUnit 5 IT:
    
    - 3 concurrent nodes with deterministic staggered startup, each with its own
      embedded HazelcastInstance and HazelcastRoutePolicy
    - Verifies distributed lock-based leader election: all nodes eventually
      acquire the lock and execute their routes
    - Thread-safe design: CopyOnWriteArrayList, local state, no static mutables
    - Try-with-resources for CamelContext; manual finally for HazelcastInstance
    - @Timeout(2 min) to prevent CI hangs
    - AssertJ assertions with descriptive messages
    - Added assertj-core test-scoped dependency to camel-hazelcast
---
 components/camel-hazelcast/pom.xml                 |  5 ++
 .../hazelcast/policy/HazelcastRoutePolicyIT.java   | 98 ++++++++++++----------
 2 files changed, 58 insertions(+), 45 deletions(-)

diff --git a/components/camel-hazelcast/pom.xml 
b/components/camel-hazelcast/pom.xml
index 2511a42c396d..cb4f246d7639 100644
--- a/components/camel-hazelcast/pom.xml
+++ b/components/camel-hazelcast/pom.xml
@@ -79,6 +79,11 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <profiles>
diff --git 
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java
 
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java
index 05ae33a243ba..7d841943c2b4 100644
--- 
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java
+++ 
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java
@@ -16,14 +16,13 @@
  */
 package org.apache.camel.component.hazelcast.policy;
 
-import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.IntStream;
 
 import com.hazelcast.config.Config;
 import com.hazelcast.core.Hazelcast;
@@ -32,12 +31,14 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.test.infra.hazelcast.services.HazelcastService;
 import org.apache.camel.test.infra.hazelcast.services.HazelcastServiceFactory;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /**
  * Integration test for {@link HazelcastRoutePolicy} that verifies leader 
election and route management using Hazelcast
  * distributed locks.
@@ -45,35 +46,37 @@ import org.slf4j.LoggerFactory;
 public class HazelcastRoutePolicyIT {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HazelcastRoutePolicyIT.class);
+    private static final List<String> CLIENTS = List.of("0", "1", "2");
 
     @RegisterExtension
     public static HazelcastService hazelcastService = 
HazelcastServiceFactory.createService();
 
-    private static final List<String> CLIENTS = IntStream.range(0, 
3).mapToObj(Integer::toString).toList();
-    private static final List<String> RESULTS = new ArrayList<>();
-    private static final ScheduledExecutorService SCHEDULER = 
Executors.newScheduledThreadPool(CLIENTS.size() * 2);
-    private static final CountDownLatch LATCH = new 
CountDownLatch(CLIENTS.size());
-
     @Test
-    public void test() throws Exception {
+    @Timeout(value = 2, unit = TimeUnit.MINUTES)
+    public void testLeaderElectionWithMultipleNodes() throws Exception {
+        List<String> results = new CopyOnWriteArrayList<>();
+        CountDownLatch latch = new CountDownLatch(CLIENTS.size());
+        ExecutorService executor = 
Executors.newFixedThreadPool(CLIENTS.size());
+
         for (String id : CLIENTS) {
-            SCHEDULER.submit(() -> run(id));
+            executor.submit(() -> run(id, results, latch));
         }
 
-        LATCH.await(1, TimeUnit.MINUTES);
-        SCHEDULER.shutdownNow();
+        assertThat(latch.await(1, TimeUnit.MINUTES)).as("All nodes should 
complete within timeout").isTrue();
+        executor.shutdown();
+        assertThat(executor.awaitTermination(30, 
TimeUnit.SECONDS)).as("Executor should terminate cleanly").isTrue();
 
-        Assertions.assertEquals(CLIENTS.size(), RESULTS.size());
-        Assertions.assertTrue(RESULTS.containsAll(CLIENTS));
+        assertThat(results).containsExactlyInAnyOrderElementsOf(CLIENTS);
     }
 
-    private static void run(String id) {
+    private static void run(String id, List<String> results, CountDownLatch 
latch) {
+        HazelcastInstance instance = null;
         try {
             int events = ThreadLocalRandom.current().nextInt(2, 6);
             CountDownLatch contextLatch = new CountDownLatch(events);
 
             Config config = hazelcastService.createConfiguration(null, 0, 
"node-" + id, "set");
-            HazelcastInstance instance = 
Hazelcast.newHazelcastInstance(config);
+            instance = Hazelcast.newHazelcastInstance(config);
 
             HazelcastRoutePolicy policy = new HazelcastRoutePolicy(instance);
             policy.setLockMapName("camel-route-policy");
@@ -81,36 +84,41 @@ public class HazelcastRoutePolicyIT {
             policy.setLockValue("node-" + id);
             policy.setTryLockTimeout(5, TimeUnit.SECONDS);
 
-            DefaultCamelContext context = new DefaultCamelContext();
-            context.disableJMX();
-            context.getCamelContextExtension().setName("context-" + id);
-            context.addRoutes(new RouteBuilder() {
-                @Override
-                public void configure() {
-                    from("timer:hazelcast?delay=1000&period=1000")
-                            .routeId("route-" + id)
-                            .routePolicy(policy)
-                            .log("From ${routeId}")
-                            .process(e -> contextLatch.countDown());
+            try (DefaultCamelContext context = new DefaultCamelContext()) {
+                context.disableJMX();
+                context.getCamelContextExtension().setName("context-" + id);
+                context.addRoutes(new RouteBuilder() {
+                    @Override
+                    public void configure() {
+                        from("timer:hazelcast?delay=1000&period=1000")
+                                .routeId("route-" + id)
+                                .routePolicy(policy)
+                                .log("From ${routeId}")
+                                .process(e -> contextLatch.countDown());
+                    }
+                });
+
+                // Deterministic staggered startup based on node index
+                Thread.sleep(Integer.parseInt(id) * 200L);
+
+                LOGGER.info("Starting CamelContext on node: {}", id);
+                context.start();
+                LOGGER.info("Started CamelContext on node: {}", id);
+
+                if (contextLatch.await(30, TimeUnit.SECONDS)) {
+                    LOGGER.info("Node {} completed {} events successfully", 
id, events);
+                    results.add(id);
+                } else {
+                    LOGGER.warn("Node {} timed out waiting for route events 
(expected {} events)", id, events);
                 }
-            });
-
-            // Staggered startup
-            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
-
-            LOGGER.info("Starting CamelContext on node: {}", id);
-            context.start();
-            LOGGER.info("Started CamelContext on node: {}", id);
-
-            contextLatch.await(30, TimeUnit.SECONDS);
-
-            LOGGER.info("Shutting down node {}", id);
-            RESULTS.add(id);
-            context.stop();
-            instance.shutdown();
-            LATCH.countDown();
+            }
         } catch (Exception e) {
-            LOGGER.warn("{}", e.getMessage(), e);
+            LOGGER.warn("Node {} failed: {}", id, e.getMessage(), e);
+        } finally {
+            if (instance != null) {
+                instance.shutdown();
+            }
+            latch.countDown();
         }
     }
 }

Reply via email to