This is an automated email from the ASF dual-hosted git repository. pcongiusti pushed a commit to branch camel-4.14.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit b44f64e9881dbc96dc073510360aa36d42d76792 Author: Pasquale Congiusti <[email protected]> AuthorDate: Wed Oct 15 09:59:09 2025 +0200 fix(core): file lock critical section Make sure that only one process at a time can access the file lock generation. Closes CAMEL-22541 --- .../file/cluster/FileLockClusterView.java | 44 ++++++++++++++-------- .../FileLockClusteredRoutePolicyFactoryTest.java | 39 ++++++++++++++----- 2 files changed, 59 insertions(+), 24 deletions(-) diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java index 0fe775ae582e..a5d8d75e8875 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java @@ -33,6 +33,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.cluster.CamelClusterMember; import org.apache.camel.support.cluster.AbstractCamelClusterView; @@ -40,6 +41,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class FileLockClusterView extends AbstractCamelClusterView { + + // Used only during service startup as each context could try to access it concurrently. + // It isolates the critical section making sure only one service creates the files. + private static ReentrantLock contextStartLock = new ReentrantLock(); + private static final Logger LOGGER = LoggerFactory.getLogger(FileLockClusterView.class); private final ClusterMember localMember; private final Path leaderLockPath; @@ -82,21 +88,29 @@ public class FileLockClusterView extends AbstractCamelClusterView { @Override protected void doStart() throws Exception { - if (leaderLockFile != null) { - closeInternal(); - fireLeadershipChangedEvent((CamelClusterMember) null); - } + // Start critical section + try { + contextStartLock.lock(); - if (!Files.exists(leaderLockPath.getParent())) { - Files.createDirectories(leaderLockPath.getParent()); - } + if (leaderLockFile != null) { + closeInternal(); + fireLeadershipChangedEvent((CamelClusterMember) null); + } - if (!Files.exists(leaderLockPath)) { - Files.createFile(leaderLockPath); - } + if (!Files.exists(leaderLockPath.getParent())) { + Files.createDirectories(leaderLockPath.getParent()); + } - if (!Files.exists(leaderDataPath)) { - Files.createFile(leaderDataPath); + if (!Files.exists(leaderLockPath)) { + Files.createFile(leaderLockPath); + } + + if (!Files.exists(leaderDataPath)) { + Files.createFile(leaderDataPath); + } + } finally { + // End critical section + contextStartLock.unlock(); } FileLockClusterService service = getClusterService().unwrap(FileLockClusterService.class); @@ -150,7 +164,7 @@ public class FileLockClusterView extends AbstractCamelClusterView { try { leaderLockFile.close(); } catch (Exception ignore) { - // Ignore + LOGGER.warn("{}", ignore.getMessage(), ignore); } leaderLockFile = null; } @@ -159,7 +173,7 @@ public class FileLockClusterView extends AbstractCamelClusterView { try { leaderDataFile.close(); } catch (Exception ignore) { - // Ignore + LOGGER.warn("{}", ignore.getMessage(), ignore); } leaderDataFile = null; } @@ -170,7 +184,7 @@ public class FileLockClusterView extends AbstractCamelClusterView { try { lock.release(); } catch (Exception ignore) { - // Ignore + LOGGER.warn("{}", ignore.getMessage(), ignore); } } } diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/cluster/FileLockClusteredRoutePolicyFactoryTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/cluster/FileLockClusteredRoutePolicyFactoryTest.java index 01caa740ce91..9e767322b4ae 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/cluster/FileLockClusteredRoutePolicyFactoryTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/cluster/FileLockClusteredRoutePolicyFactoryTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.file.cluster; +import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; @@ -30,7 +31,8 @@ import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.impl.cluster.ClusteredRoutePolicyFactory; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.io.TempDir; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,26 +42,38 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public final class FileLockClusteredRoutePolicyFactoryTest { private static final Logger LOGGER = LoggerFactory.getLogger(FileLockClusteredRoutePolicyFactoryTest.class); - private static final List<String> CLIENTS = List.of("0", "1", "2"); - private static final List<String> RESULTS = new ArrayList<>(); - private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size()); - private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size()); + + private List<String> CLIENTS; + private List<String> RESULTS; + private ScheduledExecutorService SCHEDULER; + private CountDownLatch LATCH; + + @BeforeEach + public void init() { + CLIENTS = List.of("0", "1", "2"); + RESULTS = new ArrayList<>(); + SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size()); + LATCH = new CountDownLatch(CLIENTS.size()); + } @TempDir - private static Path tempDir; + private Path tempDir; // ************************************ // Test // ************************************ - @Test + // Repeating the test more than once is required to understand if the file locking + // is managed properly and the locks are released upon context shutdown. + @RepeatedTest(5) public void test() throws Exception { for (String id : CLIENTS) { SCHEDULER.submit(() -> run(id)); } LATCH.await(20, TimeUnit.SECONDS); - SCHEDULER.shutdownNow(); + List<Runnable> waitingTasks = SCHEDULER.shutdownNow(); + assertEquals(0, waitingTasks.size(), "All scheduled tasks should have been completed!"); assertEquals(CLIENTS.size(), RESULTS.size()); assertTrue(RESULTS.containsAll(CLIENTS)); @@ -70,6 +84,8 @@ public final class FileLockClusteredRoutePolicyFactoryTest { // ************************************ private void run(String id) { + LOGGER.info("Starting task using file lock cluster service {}/node-{}", tempDir.toString(), id); + DefaultCamelContext context = new DefaultCamelContext(); try { int events = ThreadLocalRandom.current().nextInt(2, 6); CountDownLatch contextLatch = new CountDownLatch(events); @@ -80,7 +96,6 @@ public final class FileLockClusteredRoutePolicyFactoryTest { service.setAcquireLockDelay(100, TimeUnit.MILLISECONDS); service.setAcquireLockInterval(100, TimeUnit.MILLISECONDS); - DefaultCamelContext context = new DefaultCamelContext(); context.disableJMX(); context.getCamelContextExtension().setName("context-" + id); context.addService(service); @@ -110,6 +125,12 @@ public final class FileLockClusteredRoutePolicyFactoryTest { LATCH.countDown(); } catch (Exception e) { LOGGER.warn("{}", e.getMessage(), e); + } finally { + try { + context.close(); + } catch (IOException e) { + LOGGER.warn("{}", e.getMessage(), e); + } } } }
