This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch camel-4.14.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 381eecebb533451375be478f69f877bf4c8f7a6c Author: James Netherton <[email protected]> AuthorDate: Mon Dec 15 10:16:56 2025 +0000 CAMEL-22784: Fix FileLockClusterService failover reliability --- .../file/cluster/FileLockClusterLeaderInfo.java | 24 +++---- .../file/cluster/FileLockClusterService.java | 29 ++++++++ .../file/cluster/FileLockClusterUtils.java | 59 ++++++++-------- .../file/cluster/FileLockClusterView.java | 77 ++++++++++++++++----- .../file/cluster/FileLockClusterUtilsTest.java | 60 +++++++++-------- ...FileLockClusterServiceAdvancedFailoverTest.java | 2 +- .../FileLockClusterServiceBasicFailoverTest.java | 78 ++++++++++++++++++++++ .../cluster/FileLockClusterServiceTestBase.java | 10 +++ .../user-manual/modules/ROOT/pages/clustering.adoc | 37 +++++++--- 9 files changed, 281 insertions(+), 95 deletions(-) diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java index 6d83d3357185..77f0ba55fe01 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java @@ -23,33 +23,33 @@ import java.util.Objects; */ final class FileLockClusterLeaderInfo { private final String id; - private final long heartbeatUpdateIntervalNanoseconds; - private final long heartbeatNanoseconds; + private final long heartbeatUpdateIntervalMilliseconds; + private final long heartbeatMilliseconds; /** * Constructs a {@link FileLockClusterLeaderInfo}. * - * @param id The unique UUID assigned to the cluster leader - * @param heartbeatUpdateIntervalNanoseconds The cluster leader heartbeat update interval value in nanoseconds - * @param heartbeatNanoseconds The cluster leader heartbeat value in nanoseconds + * @param id The unique UUID assigned to the cluster leader + * @param heartbeatUpdateIntervalMilliseconds The cluster leader heartbeat update interval value in milliseconds + * @param heartbeatMilliseconds The cluster leader heartbeat value in milliseconds */ - FileLockClusterLeaderInfo(String id, long heartbeatUpdateIntervalNanoseconds, long heartbeatNanoseconds) { + FileLockClusterLeaderInfo(String id, long heartbeatUpdateIntervalMilliseconds, long heartbeatMilliseconds) { Objects.requireNonNull(id); this.id = id; - this.heartbeatUpdateIntervalNanoseconds = heartbeatUpdateIntervalNanoseconds; - this.heartbeatNanoseconds = heartbeatNanoseconds; + this.heartbeatUpdateIntervalMilliseconds = heartbeatUpdateIntervalMilliseconds; + this.heartbeatMilliseconds = heartbeatMilliseconds; } String getId() { return id; } - long getHeartbeatNanoseconds() { - return heartbeatNanoseconds; + long getHeartbeatMilliseconds() { + return heartbeatMilliseconds; } - long getHeartbeatUpdateIntervalNanoseconds() { - return heartbeatUpdateIntervalNanoseconds; + long getHeartbeatUpdateIntervalMilliseconds() { + return heartbeatUpdateIntervalMilliseconds; } @Override diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java index e21568925f0a..5b53ce16b7c7 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java @@ -32,6 +32,8 @@ public class FileLockClusterService extends AbstractCamelClusterService<FileLock private TimeUnit acquireLockIntervalUnit; private ScheduledExecutorService executor; private int heartbeatTimeoutMultiplier; + private long acquireLeadershipBackoff; + private TimeUnit acquireLeadershipBackoffIntervalUnit; public FileLockClusterService() { this.acquireLockDelay = 1; @@ -39,6 +41,8 @@ public class FileLockClusterService extends AbstractCamelClusterService<FileLock this.acquireLockInterval = 10; this.acquireLockIntervalUnit = TimeUnit.SECONDS; this.heartbeatTimeoutMultiplier = 5; + this.acquireLeadershipBackoff = 0; + this.acquireLeadershipBackoffIntervalUnit = TimeUnit.SECONDS; } @Override @@ -127,6 +131,31 @@ public class FileLockClusterService extends AbstractCamelClusterService<FileLock return heartbeatTimeoutMultiplier; } + /** + * The time to wait before a cluster member broadcasts acquisition of gaining cluster leadership. A value of 0 (the + * default) disables the backoff wait period. This can be useful to introduce a delay to ensure that the cluster + * leader has fully surrendered its leadership. A sensible value is acquireLockDelay + 5 seconds. Or whatever is + * approprite for your environment and requirements. + */ + public void setAcquireLeadershipBackoff(long acquireLeadershipBackoff) { + this.acquireLeadershipBackoff = acquireLeadershipBackoff; + } + + public long getAcquireLeadershipBackoff() { + return acquireLeadershipBackoff; + } + + /** + * The time unit for the acquireLeadershipBackoff, default to TimeUnit.SECONDS. + */ + public void setAcquireLeadershipBackoffIntervalUnit(TimeUnit acquireLeadershipBackoffIntervalUnit) { + this.acquireLeadershipBackoffIntervalUnit = acquireLeadershipBackoffIntervalUnit; + } + + public TimeUnit getAcquireLeadershipBackoffIntervalUnit() { + return acquireLeadershipBackoffIntervalUnit; + } + @Override protected void doStop() throws Exception { super.doStop(); diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java index ebda14fe1044..fd3a7982cf9f 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.Objects; @@ -75,8 +76,8 @@ final class FileLockClusterUtils { ByteBuffer buf = ByteBuffer.allocate(LOCKFILE_BUFFER_SIZE); buf.put(uuidBytes); - buf.putLong(clusterLeaderInfo.getHeartbeatUpdateIntervalNanoseconds()); - buf.putLong(clusterLeaderInfo.getHeartbeatNanoseconds()); + buf.putLong(clusterLeaderInfo.getHeartbeatUpdateIntervalMilliseconds()); + buf.putLong(clusterLeaderInfo.getHeartbeatMilliseconds()); buf.flip(); if (forceMetaData) { @@ -100,26 +101,28 @@ final class FileLockClusterUtils { * @throws IOException If reading the lock file failed */ static FileLockClusterLeaderInfo readClusterLeaderInfo(Path leaderDataPath) throws IOException { - if (!Files.exists(leaderDataPath)) { - return null; - } - - byte[] bytes = Files.readAllBytes(leaderDataPath); - if (bytes.length < LOCKFILE_BUFFER_SIZE) { - // Data is incomplete or in a transient / corrupt state + try { + byte[] bytes = Files.readAllBytes(leaderDataPath); + + if (bytes.length < LOCKFILE_BUFFER_SIZE) { + // Data is incomplete or in a transient / corrupt state + return null; + } + + // Parse the cluster leader data + ByteBuffer buf = ByteBuffer.wrap(bytes); + byte[] uuidBytes = new byte[UUID_BYTE_LENGTH]; + buf.get(uuidBytes); + + String uuidStr = new String(uuidBytes, StandardCharsets.UTF_8); + long intervalMillis = buf.getLong(); + long lastHeartbeat = buf.getLong(); + + return new FileLockClusterLeaderInfo(uuidStr, intervalMillis, lastHeartbeat); + } catch (NoSuchFileException e) { + // Handle NoSuchFileException to give the ClusterView a chance to recreate the leadership data return null; } - - // Parse the cluster leader data - ByteBuffer buf = ByteBuffer.wrap(bytes); - byte[] uuidBytes = new byte[UUID_BYTE_LENGTH]; - buf.get(uuidBytes); - - String uuidStr = new String(uuidBytes, StandardCharsets.UTF_8); - long intervalNanos = buf.getLong(); - long lastHeartbeat = buf.getLong(); - - return new FileLockClusterLeaderInfo(uuidStr, intervalNanos, lastHeartbeat); } /** @@ -130,15 +133,15 @@ final class FileLockClusterUtils { * leader state * @param previousClusterLeaderInfo The {@link FileLockClusterLeaderInfo} instance representing the previously * recorded cluster leader state - * @param currentNanoTime The current time in nanoseconds, as returned by {@link System#nanoTime()} is - * held + * @param currentTimeMillis The current time in milliseconds, as returned by + * {@link System#currentTimeMillis()} is held * @return {@code true} if the leader is considered stale. {@code false} if the leader is * still active */ static boolean isLeaderStale( FileLockClusterLeaderInfo latestClusterLeaderInfo, FileLockClusterLeaderInfo previousClusterLeaderInfo, - long currentNanoTime, + long currentTimeMillis, int heartbeatTimeoutMultiplier) { if (latestClusterLeaderInfo == null) { @@ -150,8 +153,8 @@ final class FileLockClusterUtils { return false; } - final long latestHeartbeat = latestClusterLeaderInfo.getHeartbeatNanoseconds(); - final long previousObservedHeartbeat = previousClusterLeaderInfo.getHeartbeatNanoseconds(); + final long latestHeartbeat = latestClusterLeaderInfo.getHeartbeatMilliseconds(); + final long previousObservedHeartbeat = previousClusterLeaderInfo.getHeartbeatMilliseconds(); if (latestHeartbeat > previousObservedHeartbeat) { // Not stale. Cluster leader is alive and updating the lock file @@ -164,9 +167,9 @@ final class FileLockClusterUtils { } // Check if cluster leader has updated the lock file within acceptable limits - final long elapsed = currentNanoTime - previousObservedHeartbeat; - final long heartbeatUpdateIntervalNanoseconds = latestClusterLeaderInfo.getHeartbeatUpdateIntervalNanoseconds(); - final long timeout = heartbeatUpdateIntervalNanoseconds * (long) heartbeatTimeoutMultiplier; + final long elapsed = currentTimeMillis - previousObservedHeartbeat; + final long heartbeatUpdateIntervalMilliseconds = latestClusterLeaderInfo.getHeartbeatUpdateIntervalMilliseconds(); + final long timeout = heartbeatUpdateIntervalMilliseconds * (long) heartbeatTimeoutMultiplier; return elapsed > timeout; } } diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java index 8c6787bfcae8..64701a750b85 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java @@ -32,6 +32,7 @@ import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; @@ -45,18 +46,20 @@ public class FileLockClusterView extends AbstractCamelClusterView { // Used only during service startup as each context could try to access it concurrently. // It isolates the critical section making sure only one service creates the files. private static final ReentrantLock contextStartLock = new ReentrantLock(); - private static final Logger LOGGER = LoggerFactory.getLogger(FileLockClusterView.class); + private final ClusterMember localMember; private final Path leaderLockPath; private final Path leaderDataPath; private final AtomicReference<FileLockClusterLeaderInfo> clusterLeaderInfoRef = new AtomicReference<>(); + private final AtomicBoolean firstAcquireLockAttemptCompleted = new AtomicBoolean(); private RandomAccessFile leaderLockFile; private RandomAccessFile leaderDataFile; private FileLock lock; private ScheduledFuture<?> task; private int heartbeatTimeoutMultiplier; - private long acquireLockIntervalDelayNanoseconds; + private long acquireLockIntervalMilliseconds; + private long acquireLeadershipBackoffMilliseconds; FileLockClusterView(FileLockClusterService cluster, String namespace) { super(cluster, namespace); @@ -114,7 +117,7 @@ public class FileLockClusterView extends AbstractCamelClusterView { } FileLockClusterService service = getClusterService().unwrap(FileLockClusterService.class); - acquireLockIntervalDelayNanoseconds = TimeUnit.NANOSECONDS.convert( + acquireLockIntervalMilliseconds = TimeUnit.MILLISECONDS.convert( service.getAcquireLockInterval(), service.getAcquireLockIntervalUnit()); @@ -123,6 +126,14 @@ public class FileLockClusterView extends AbstractCamelClusterView { throw new IllegalArgumentException("HeartbeatTimeoutMultiplier must be greater than 0"); } + long acquireLeadershipBackoff = service.getAcquireLeadershipBackoff(); + if (acquireLeadershipBackoff < 0) { + throw new IllegalArgumentException("acquireLeadershipBackoff must not be a negative value"); + } + acquireLeadershipBackoffMilliseconds = TimeUnit.MILLISECONDS.convert( + acquireLeadershipBackoff, + service.getAcquireLeadershipBackoffIntervalUnit()); + ScheduledExecutorService executor = service.getExecutor(); task = executor.scheduleWithFixedDelay(this::tryLock, TimeUnit.MILLISECONDS.convert(service.getAcquireLockDelay(), service.getAcquireLockDelayUnit()), @@ -148,6 +159,7 @@ public class FileLockClusterView extends AbstractCamelClusterView { closeInternal(); localMember.setStatus(ClusterMemberStatus.STOPPED); clusterLeaderInfoRef.set(null); + firstAcquireLockAttemptCompleted.set(false); } private void closeInternal() { @@ -211,10 +223,10 @@ public class FileLockClusterView extends AbstractCamelClusterView { LOGGER.info("Lock on file {} lost (lock={}, cluster-member-id={})", leaderLockPath, lock, localMember.getUuid()); localMember.setStatus(ClusterMemberStatus.FOLLOWER); + fireLeadershipChangedEvent((CamelClusterMember) null); releaseFileLock(); closeLockFiles(); lock = null; - fireLeadershipChangedEvent((CamelClusterMember) null); return; } @@ -240,23 +252,38 @@ public class FileLockClusterView extends AbstractCamelClusterView { leaderDataFile = new RandomAccessFile(leaderDataPath.toFile(), "rw"); lock = null; - if (Files.isReadable(leaderLockPath)) { - lock = leaderLockFile.getChannel().tryLock(0, Math.max(1, leaderLockFile.getChannel().size()), false); - } + lock = leaderLockFile.getChannel().tryLock(0, Math.max(1, leaderLockFile.getChannel().size()), false); if (lock != null) { LOGGER.info("Lock on file {} acquired (lock={}, cluster-member-id={})", leaderLockPath, lock, localMember.getUuid()); - localMember.setStatus(ClusterMemberStatus.LEADER); - clusterLeaderInfoRef.set(null); - fireLeadershipChangedEvent(localMember); - writeClusterLeaderInfo(true); + + boolean interrupted = false; + if (firstAcquireLockAttemptCompleted.get()) { + // If this is not the initial attempt to claim leadership, give some time for the existing leader to fully relinquish leadership + interrupted = applyAcquireLeadershipBackoff(); + } + + if (interrupted) { + // applyAcquireLeadershipBackoff sleep was interrupted, likely because we are being shutdown + lock = null; + } else { + localMember.setStatus(ClusterMemberStatus.LEADER); + clusterLeaderInfoRef.set(null); + firstAcquireLockAttemptCompleted.set(false); + fireLeadershipChangedEvent(localMember); + writeClusterLeaderInfo(true); + } } else { LOGGER.debug("Lock on file {} not acquired", leaderLockPath); } } else { LOGGER.debug("Existing cluster leader is valid. Retrying leadership acquisition on next interval"); } + + if (localMember.getStatus().equals(ClusterMemberStatus.FOLLOWER)) { + firstAcquireLockAttemptCompleted.set(true); + } } catch (OverlappingFileLockException e) { reason = new IOException(e); } catch (Exception e) { @@ -275,7 +302,7 @@ public class FileLockClusterView extends AbstractCamelClusterView { return FileLockClusterUtils.isLeaderStale( clusterLeaderInfo, previousClusterLeaderInfo, - System.nanoTime(), + System.currentTimeMillis(), heartbeatTimeoutMultiplier); } @@ -286,8 +313,8 @@ public class FileLockClusterView extends AbstractCamelClusterView { void writeClusterLeaderInfo(boolean forceMetaData) throws IOException { FileLockClusterLeaderInfo latestClusterLeaderInfo = new FileLockClusterLeaderInfo( localMember.getUuid(), - acquireLockIntervalDelayNanoseconds, - System.nanoTime()); + acquireLockIntervalMilliseconds, + System.currentTimeMillis()); FileLockClusterUtils.writeClusterLeaderInfo( leaderDataPath, @@ -300,11 +327,11 @@ public class FileLockClusterView extends AbstractCamelClusterView { if (localMember.isLeader()) { try { FileLockClusterLeaderInfo leaderInfo = FileLockClusterUtils.readClusterLeaderInfo(leaderDataPath); - return lock != null + return leaderInfo != null + && lock != null && lock.isValid() - && Files.exists(leaderLockPath) - && leaderInfo != null - && localMember.getUuid().equals(leaderInfo.getId()); + && localMember.getUuid().equals(leaderInfo.getId()) + && Files.exists(leaderLockPath); } catch (Exception e) { LOGGER.debug("Failed to read {} (cluster-member-id={})", leaderLockPath, localMember.getUuid(), e); return false; @@ -313,6 +340,20 @@ public class FileLockClusterView extends AbstractCamelClusterView { return false; } + boolean applyAcquireLeadershipBackoff() { + boolean interrupted = false; + if (acquireLeadershipBackoffMilliseconds > 0) { + try { + LOGGER.debug("Waiting {} milliseconds before claiming leadership", acquireLeadershipBackoffMilliseconds); + Thread.sleep(acquireLeadershipBackoffMilliseconds); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + interrupted = true; + } + } + return interrupted; + } + private final class ClusterMember implements CamelClusterMember { private final AtomicReference<ClusterMemberStatus> status = new AtomicReference<>(ClusterMemberStatus.STOPPED); private final String uuid = UUID.randomUUID().toString(); diff --git a/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java index 072ce576cd60..3e6f9db8a578 100644 --- a/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java +++ b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java @@ -40,7 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class FileLockClusterUtilsTest { @Test void nullLeaderInfoIsStale() { - assertTrue(FileLockClusterUtils.isLeaderStale(null, null, System.nanoTime(), 5)); + assertTrue(FileLockClusterUtils.isLeaderStale(null, null, System.currentTimeMillis(), 5)); } @Test @@ -48,34 +48,36 @@ class FileLockClusterUtilsTest { String clusterMemberId = UUID.randomUUID().toString(); FileLockClusterLeaderInfo previousClusterLeaderInfo = new FileLockClusterLeaderInfo( clusterMemberId, - TimeUnit.SECONDS.toNanos(1), - System.nanoTime()); + TimeUnit.SECONDS.toMillis(1), + System.currentTimeMillis()); FileLockClusterLeaderInfo latestClusterLeaderInfo = new FileLockClusterLeaderInfo( clusterMemberId, - TimeUnit.SECONDS.toNanos(1), - System.nanoTime()); + TimeUnit.SECONDS.toMillis(1), + System.currentTimeMillis()); assertFalse( - FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo, previousClusterLeaderInfo, System.nanoTime(), 5)); + FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo, previousClusterLeaderInfo, + System.currentTimeMillis(), 5)); } @Test void sameHeartbeatIsStale() { String clusterMemberId = UUID.randomUUID().toString(); - long heartbeatNanoseconds = System.nanoTime() - TimeUnit.SECONDS.toNanos(10); + long heartbeatMilliseconds = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10); FileLockClusterLeaderInfo previousClusterLeaderInfo = new FileLockClusterLeaderInfo( clusterMemberId, - TimeUnit.SECONDS.toNanos(1), - heartbeatNanoseconds); + TimeUnit.SECONDS.toMillis(1), + heartbeatMilliseconds); FileLockClusterLeaderInfo latestClusterLeaderInfo = new FileLockClusterLeaderInfo( clusterMemberId, - TimeUnit.SECONDS.toNanos(1), - heartbeatNanoseconds); + TimeUnit.SECONDS.toMillis(1), + heartbeatMilliseconds); assertTrue( - FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo, previousClusterLeaderInfo, System.nanoTime(), 3)); + FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo, previousClusterLeaderInfo, + System.currentTimeMillis(), 3)); } @Test @@ -83,34 +85,35 @@ class FileLockClusterUtilsTest { String clusterMemberId = UUID.randomUUID().toString(); FileLockClusterLeaderInfo previousClusterLeaderInfo = new FileLockClusterLeaderInfo( clusterMemberId, - TimeUnit.SECONDS.toNanos(1), - System.nanoTime() - TimeUnit.SECONDS.toNanos(5)); + TimeUnit.SECONDS.toMillis(1), + System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(5)); FileLockClusterLeaderInfo latestClusterLeaderInfo = new FileLockClusterLeaderInfo( clusterMemberId, - TimeUnit.SECONDS.toNanos(1), - System.nanoTime() - TimeUnit.SECONDS.toNanos(10)); + TimeUnit.SECONDS.toMillis(1), + System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10)); assertTrue( - FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo, previousClusterLeaderInfo, System.nanoTime(), 3)); + FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo, previousClusterLeaderInfo, + System.currentTimeMillis(), 3)); } @Test void heartbeatExactlyAtThreshold() { int heartbeatMultiplier = 3; - long now = System.nanoTime(); - long updateInterval = TimeUnit.SECONDS.toNanos(1); + long now = System.currentTimeMillis(); + long updateInterval = TimeUnit.SECONDS.toMillis(1); long heartbeat = now - (updateInterval * heartbeatMultiplier); String clusterMemberId = UUID.randomUUID().toString(); FileLockClusterLeaderInfo previousClusterLeaderInfo = new FileLockClusterLeaderInfo( clusterMemberId, - TimeUnit.SECONDS.toNanos(1), + TimeUnit.SECONDS.toMillis(1), heartbeat); FileLockClusterLeaderInfo latestClusterLeaderInfo = new FileLockClusterLeaderInfo( clusterMemberId, - TimeUnit.SECONDS.toNanos(1), + TimeUnit.SECONDS.toMillis(1), heartbeat); assertFalse(FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo, previousClusterLeaderInfo, now, @@ -121,16 +124,17 @@ class FileLockClusterUtilsTest { void leaderChangedNotStale() { FileLockClusterLeaderInfo previousClusterLeaderInfo = new FileLockClusterLeaderInfo( UUID.randomUUID().toString(), - TimeUnit.SECONDS.toNanos(1), - System.nanoTime()); + TimeUnit.SECONDS.toMillis(1), + System.currentTimeMillis()); FileLockClusterLeaderInfo latestClusterLeaderInfo = new FileLockClusterLeaderInfo( UUID.randomUUID().toString(), - TimeUnit.SECONDS.toNanos(1), - System.nanoTime()); + TimeUnit.SECONDS.toMillis(1), + System.currentTimeMillis()); assertFalse( - FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo, previousClusterLeaderInfo, System.nanoTime(), 3)); + FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo, previousClusterLeaderInfo, + System.currentTimeMillis(), 3)); } @Test @@ -189,8 +193,8 @@ class FileLockClusterUtilsTest { FileLockClusterLeaderInfo clusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(lockFile); assertNotNull(clusterLeaderInfo); - assertEquals(1L, clusterLeaderInfo.getHeartbeatUpdateIntervalNanoseconds()); - assertEquals(2L, clusterLeaderInfo.getHeartbeatNanoseconds()); + assertEquals(1L, clusterLeaderInfo.getHeartbeatUpdateIntervalMilliseconds()); + assertEquals(2L, clusterLeaderInfo.getHeartbeatMilliseconds()); assertDoesNotThrow(() -> UUID.fromString(clusterLeaderInfo.getId())); } } diff --git a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java index e7746679d041..515cee417a84 100644 --- a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java +++ b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java @@ -305,7 +305,7 @@ class FileLockClusterServiceAdvancedFailoverTest extends FileLockClusterServiceT clusterLeader.stop(); // Make the cluster data file appear stale (i.e. not updated within acceptable bounds) - long staleHeartbeatTimestamp = leaderInfo.get().getHeartbeatNanoseconds() - TimeUnit.SECONDS.toNanos(100); + long staleHeartbeatTimestamp = leaderInfo.get().getHeartbeatMilliseconds() - TimeUnit.SECONDS.toMillis(100); FileLockClusterLeaderInfo updatedInfo = new FileLockClusterLeaderInfo( diff --git a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java index 72b49bc1b486..a6b3e1c784d1 100644 --- a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java +++ b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java @@ -167,6 +167,71 @@ class FileLockClusterServiceBasicFailoverTest extends FileLockClusterServiceTest assertEquals(0, Files.size(dataFile)); } + @Test + void clusterFailoverWithBackoffWhenLeaderCamelContextStopped() throws Exception { + CamelContext clusterLeader = createCamelContext(); + + ClusterConfig followerConfig = new ClusterConfig(); + followerConfig.setAcquireLockDelay(2); + followerConfig.setAcquireLeadershipBackoff(5); + CamelContext clusterFollower = createCamelContext(followerConfig); + + try { + MockEndpoint mockEndpointClustered = clusterLeader.getEndpoint("mock:result", MockEndpoint.class); + mockEndpointClustered.expectedMessageCount(5); + + clusterLeader.start(); + clusterFollower.start(); + + mockEndpointClustered.assertIsSatisfied(); + + AtomicReference<String> leaderId = new AtomicReference<>(); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertTrue(Files.exists(lockFile)); + assertTrue(Files.exists(dataFile)); + assertTrue(getClusterMember(clusterLeader).isLeader()); + + FileLockClusterLeaderInfo clusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(dataFile); + assertNotNull(clusterLeaderInfo); + + assertNotNull(clusterLeaderInfo.getId()); + assertDoesNotThrow(() -> UUID.fromString(clusterLeaderInfo.getId())); + leaderId.set(clusterLeaderInfo.getId()); + }); + + // Wait enough time for the follower to have run its lock acquisition scheduled task + Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis()); + + // The follower should not have produced any messages + MockEndpoint mockEndpointFollower = clusterFollower.getEndpoint("mock:result", MockEndpoint.class); + assertTrue(mockEndpointFollower.getExchanges().isEmpty()); + + // Stop the cluster leader + clusterLeader.stop(); + + // Backoff is configured so the follower should not claim leadership within most of that period + Awaitility.await().during(Duration.ofMillis(4900)).untilAsserted(() -> { + assertFalse(getClusterMember(clusterFollower).isLeader()); + }); + + // Verify the follower was elected as the new cluster leader + Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { + FileLockClusterLeaderInfo updatedClusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(dataFile); + assertNotNull(updatedClusterLeaderInfo); + + String newLeaderId = updatedClusterLeaderInfo.getId(); + assertNotNull(newLeaderId); + assertDoesNotThrow(() -> UUID.fromString(newLeaderId)); + assertNotEquals(leaderId.get(), newLeaderId); + assertEquals(5, mockEndpointFollower.getExchanges().size()); + }); + } finally { + clusterFollower.stop(); + } + + assertEquals(0, Files.size(dataFile)); + } + @Test void singleClusterMemberRecoversLeadershipIfUUIDRemovedFromLockFile() throws Exception { try (CamelContext clusterLeader = createCamelContext()) { @@ -245,4 +310,17 @@ class FileLockClusterServiceBasicFailoverTest extends FileLockClusterServiceTest }); assertIsInstanceOf(IllegalArgumentException.class, exception.getCause()); } + + @Test + void negativeAcquireLeadershipBackoffThrowsException() throws Exception { + ClusterConfig config = new ClusterConfig(); + config.setAcquireLeadershipBackoff(-1); + + Exception exception = assertThrows(Exception.class, () -> { + try (CamelContext camelContext = createCamelContext(config)) { + camelContext.start(); + } + }); + assertIsInstanceOf(IllegalArgumentException.class, exception.getCause()); + } } diff --git a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java index 72290aaacbbb..ee846f168467 100644 --- a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java +++ b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java @@ -67,6 +67,7 @@ abstract class FileLockClusterServiceTestBase { service.setAcquireLockInterval(1); service.setRoot(clusterDir.toString()); service.setHeartbeatTimeoutMultiplier(config.getHeartbeatTimeoutMultiplier()); + service.setAcquireLeadershipBackoff(config.getAcquireLeadershipBackoff()); return service; } @@ -83,6 +84,7 @@ abstract class FileLockClusterServiceTestBase { private long acquireLockDelay = 1; private long timerRepeatCount = 5; private int heartbeatTimeoutMultiplier = 5; + private long acquireLeadershipBackoff = 0; long getAcquireLockDelay() { return acquireLockDelay; @@ -111,5 +113,13 @@ abstract class FileLockClusterServiceTestBase { public void setHeartbeatTimeoutMultiplier(int heartbeatTimeoutMultiplier) { this.heartbeatTimeoutMultiplier = heartbeatTimeoutMultiplier; } + + public long getAcquireLeadershipBackoff() { + return acquireLeadershipBackoff; + } + + public void setAcquireLeadershipBackoff(long acquireLeadershipBackoff) { + this.acquireLeadershipBackoff = acquireLeadershipBackoff; + } } } diff --git a/docs/user-manual/modules/ROOT/pages/clustering.adoc b/docs/user-manual/modules/ROOT/pages/clustering.adoc index 8410159acf63..52ee7aaff7c8 100644 --- a/docs/user-manual/modules/ROOT/pages/clustering.adoc +++ b/docs/user-manual/modules/ROOT/pages/clustering.adoc @@ -45,7 +45,7 @@ Out of the box camel provides the following implementations: Configuration options: -*ConsulClusterService* +=== ConsulClusterService [options="header", cols="15,55,15,15"] |=== @@ -56,7 +56,7 @@ Configuration options: | rootPath | The Consul cluster root directory path | /camel | String |=== -*FileLockClusterService* +=== FileLockClusterService [options="header", cols="15,55,15,15"] |=== @@ -65,11 +65,32 @@ Configuration options: | acquireLockDelayUnit | The time unit for acquireLockDelay | SECONDS | TimeUnit | acquireLockInterval | The time to wait between attempts to try to acquire the cluster lock | 10 | long | acquireLockIntervalUnit | The time unit for acquireLockInterval | SECONDS | TimeUnit -| heartbeatTimeoutMultiplier | Multiplier applied to the cluster leader acquireLockInterval to determine how long followers should wait before considering the leader "stale". For example, if the leader updates its heartbeat every 2 seconds and the heartbeatTimeoutMultiplier is 3, followers will tolerate up to {@code 2s * 3 = 6s} of silence before declaring the leader unavailable | 5 | int +| acquireLeadershipBackoff | The time to wait before a cluster member broadcasts acquisition of gaining cluster leadership. A value of 0 (the default) disables the backoff wait period. This can be useful to introduce a delay to ensure that the cluster leader has fully surrendered its leadership. A sensible value is acquireLockDelay + 5 seconds. Or whatever is appropriate for your environment and requirements | 0 | long +| acquireLeadershipBackoffIntervalUnit | The time unit for acquireLeadershipBackoff | SECONDS | TimeUnit +| heartbeatTimeoutMultiplier | Multiplier applied to the cluster leader acquireLockInterval to determine how long followers should wait before considering the leader "stale". For example, if the leader updates its heartbeat every 2 seconds and the heartbeatTimeoutMultiplier is 3, followers will tolerate up to code 2s * 3 = 6s of silence before declaring the leader unavailable | 5 | int | rootPath | The file cluster root directory path | | String |=== -*InfinispanClusterService* +[IMPORTANT] +==== +As with most clustering implementations, it is important that the clocks on all machines in the file lock cluster are kept in sync. +==== + +==== Storing the cluster root directory on NFS + +When storing the FileLockClusterService cluster root directory (see the rootPath configuration option) on NFS, all cluster members must use NFS client mount options that allow failover to work reliably. + +The following NFS client mount options can help with this. You may need to adjust these options depending on your needs. + +* `soft` - Disables continuous retransmission attempts by the client when the NFS server does not respond to a request. +* `timeo=10` - The time, in deciseconds, the NFS client waits for a response from the NFS server, before it sends another request. The default is 600 (60 seconds). +* `retrans=1` - Specifies the number of times the NFS client attempts to retransmit a failed request to the NFS server. The default is 3. The client waits a `timeo` timeout period between each `retrans` attempt. +* `lookupcache=none` — Specifies how the kernel manages its cache of directory entries for the mount point. `none` forces the client to revalidate all cache entries before they are used. This enables the cluster leader to immediately detect any change made to its lock file, and it prevents the lock checking mechanism from returning incorrect validity information. +* `sync` — Any system call that writes data to files on the mount point causes the data to be flushed to the NFS server before the system call returns control to user space. This option provides greater data cache coherence. + +For more information about NFS mount options, see http://linux.die.net/man/5/nfs. + +=== InfinispanClusterService [options="header", cols="15,55,15,15"] |=== @@ -78,7 +99,7 @@ Configuration options: | lifespanTimeUnit | The TimeUnit of the lifespan | SECONDS | TimeUnit |=== -*JGroupsRaftClusterService* +=== JGroupsRaftClusterService [options="header", cols="15,55,15,15"] |=== @@ -89,7 +110,7 @@ Configuration options: | raftId | Unique Raft id | | String |=== -*KubernetesClusterService* +=== KubernetesClusterService [options="header", cols="15,55,15,15"] |=== @@ -106,7 +127,7 @@ Configuration options: | retryPeriodMillis | The time between two subsequent attempts to check and acquire the leadership. It is randomized using the jitter factor | 2000 | long |=== -*ZooKeeperClusterService* +=== ZooKeeperClusterService [options="header", cols="15,55,15,15"] |=== @@ -127,7 +148,7 @@ Configuration options: | basePath | The base path to store in ZooKeeper | | String |=== -Configuration examples: +=== Configuration examples: - *Spring Boot* +
