This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit c3644164bf4e61c9af7f43537405acc7443e51cb Author: James Netherton <[email protected]> AuthorDate: Mon Sep 22 07:20:25 2025 +0100 CAMEL-22430: Make FileClusterService more resilient to split-brain scenarios --- components/camel-file/pom.xml | 6 + .../file/cluster/FileLockClusterLeaderInfo.java | 71 +++++ .../file/cluster/FileLockClusterService.java | 22 +- .../file/cluster/FileLockClusterUtils.java | 172 +++++++++++ .../file/cluster/FileLockClusterView.java | 246 +++++++++++---- .../file/cluster/FileLockClusterUtilsTest.java | 196 ++++++++++++ ...FileLockClusterServiceAdvancedFailoverTest.java | 333 +++++++++++++++++++++ .../FileLockClusterServiceBasicFailoverTest.java | 248 +++++++++++++++ .../cluster/FileLockClusterServiceTestBase.java | 115 +++++++ 9 files changed, 1356 insertions(+), 53 deletions(-) diff --git a/components/camel-file/pom.xml b/components/camel-file/pom.xml index e8ea59a8d28..98e19a55e6e 100644 --- a/components/camel-file/pom.xml +++ b/components/camel-file/pom.xml @@ -48,5 +48,11 @@ <artifactId>commons-codec</artifactId> </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <scope>test</scope> + </dependency> + </dependencies> </project> diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java new file mode 100644 index 00000000000..6d83d335718 --- /dev/null +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.cluster; + +import java.util.Objects; + +/** + * Holds information about a file lock cluster leader. + */ +final class FileLockClusterLeaderInfo { + private final String id; + private final long heartbeatUpdateIntervalNanoseconds; + private final long heartbeatNanoseconds; + + /** + * Constructs a {@link FileLockClusterLeaderInfo}. + * + * @param id The unique UUID assigned to the cluster leader + * @param heartbeatUpdateIntervalNanoseconds The cluster leader heartbeat update interval value in nanoseconds + * @param heartbeatNanoseconds The cluster leader heartbeat value in nanoseconds + */ + FileLockClusterLeaderInfo(String id, long heartbeatUpdateIntervalNanoseconds, long heartbeatNanoseconds) { + Objects.requireNonNull(id); + this.id = id; + this.heartbeatUpdateIntervalNanoseconds = heartbeatUpdateIntervalNanoseconds; + this.heartbeatNanoseconds = heartbeatNanoseconds; + } + + String getId() { + return id; + } + + long getHeartbeatNanoseconds() { + return heartbeatNanoseconds; + } + + long getHeartbeatUpdateIntervalNanoseconds() { + return heartbeatUpdateIntervalNanoseconds; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FileLockClusterLeaderInfo that = (FileLockClusterLeaderInfo) o; + return Objects.equals(id, that.id); + } + + @Override + public int hashCode() { + return Objects.hashCode(id); + } +} diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java index e5522efdf33..e21568925f0 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java @@ -31,12 +31,14 @@ public class FileLockClusterService extends AbstractCamelClusterService<FileLock private long acquireLockInterval; private TimeUnit acquireLockIntervalUnit; private ScheduledExecutorService executor; + private int heartbeatTimeoutMultiplier; public FileLockClusterService() { this.acquireLockDelay = 1; this.acquireLockDelayUnit = TimeUnit.SECONDS; this.acquireLockInterval = 10; this.acquireLockIntervalUnit = TimeUnit.SECONDS; + this.heartbeatTimeoutMultiplier = 5; } @Override @@ -76,7 +78,7 @@ public class FileLockClusterService extends AbstractCamelClusterService<FileLock } /** - * The time unit fo the acquireLockDelay, default to TimeUnit.SECONDS. + * The time unit for the acquireLockDelay, default to TimeUnit.SECONDS. */ public void setAcquireLockDelayUnit(TimeUnit acquireLockDelayUnit) { this.acquireLockDelayUnit = acquireLockDelayUnit; @@ -103,12 +105,28 @@ public class FileLockClusterService extends AbstractCamelClusterService<FileLock } /** - * The time unit fo the acquireLockInterva, default to TimeUnit.SECONDS. + * The time unit for the acquireLockInterval, default to TimeUnit.SECONDS. */ public void setAcquireLockIntervalUnit(TimeUnit acquireLockIntervalUnit) { this.acquireLockIntervalUnit = acquireLockIntervalUnit; } + /** + * Multiplier applied to the cluster leader {@code acquireLockInterval} to determine how long followers should wait + * before considering the leader "stale". + * <p> + * For example, if the leader updates its heartbeat every 2 seconds and the {@code heartbeatTimeoutMultiplier} is 3, + * followers will tolerate up to {@code 2s * 3 = 6s} of silence before declaring the leader unavailable. + * <p> + */ + public void setHeartbeatTimeoutMultiplier(int heartbeatTimeoutMultiplier) { + this.heartbeatTimeoutMultiplier = heartbeatTimeoutMultiplier; + } + + public int getHeartbeatTimeoutMultiplier() { + return heartbeatTimeoutMultiplier; + } + @Override protected void doStop() throws Exception { super.doStop(); diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java new file mode 100644 index 00000000000..ebda14fe104 --- /dev/null +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.cluster; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Objects; + +/** + * Miscellaneous utility methods for managing the file lock cluster state. + */ +final class FileLockClusterUtils { + /** + * Length of byte[] obtained from java.util.UUID. + */ + static final int UUID_BYTE_LENGTH = 36; + /** + * The lock file buffer capacity when writing data for the cluster leader. + * <ul> + * <li>Cluster leader ID (UUID String) - 36 bytes</li> + * <li>Cluster leader heartbeat timestamp (long) - 8 bytes</li> + * <li>Cluster leader update interval (long) - 8 bytes</li> + * </ul> + */ + static final int LOCKFILE_BUFFER_SIZE = UUID_BYTE_LENGTH + 2 * Long.BYTES; + + private FileLockClusterUtils() { + // Utility class + } + + /** + * Writes information about the state of the cluster leader to the lock file. + * + * @param leaderDataPath The path to the lock file + * @param channel The file channel to write to + * @param clusterLeaderInfo The {@link FileLockClusterLeaderInfo} instance where the cluster leader state is held + * @param forceMetaData Whether to force changes to both the file content and metadata + * @throws IOException If the lock file is missing or writing data failed + */ + static void writeClusterLeaderInfo( + Path leaderDataPath, + FileChannel channel, + FileLockClusterLeaderInfo clusterLeaderInfo, + boolean forceMetaData) + throws IOException { + + Objects.requireNonNull(channel, "channel cannot be null"); + Objects.requireNonNull(clusterLeaderInfo, "clusterLeaderInfo cannot be null"); + + if (!Files.exists(leaderDataPath)) { + throw new FileNotFoundException("Cluster leader data file " + leaderDataPath + " not found"); + } + + String uuidStr = clusterLeaderInfo.getId(); + byte[] uuidBytes = uuidStr.getBytes(StandardCharsets.UTF_8); + + ByteBuffer buf = ByteBuffer.allocate(LOCKFILE_BUFFER_SIZE); + buf.put(uuidBytes); + buf.putLong(clusterLeaderInfo.getHeartbeatUpdateIntervalNanoseconds()); + buf.putLong(clusterLeaderInfo.getHeartbeatNanoseconds()); + buf.flip(); + + if (forceMetaData) { + channel.truncate(0); + } + + channel.position(0); + while (buf.hasRemaining()) { + channel.write(buf); + } + channel.force(forceMetaData); + } + + /** + * Reads information about the state of the cluster leader from the lock file. + * + * @param leaderDataPath The path to the lock file + * @return {@link FileLockClusterLeaderInfo} instance representing the state of the cluster leader. + * {@code null} if the lock file does not exist or reading the file content is in an + * inconsistent state + * @throws IOException If reading the lock file failed + */ + static FileLockClusterLeaderInfo readClusterLeaderInfo(Path leaderDataPath) throws IOException { + if (!Files.exists(leaderDataPath)) { + return null; + } + + byte[] bytes = Files.readAllBytes(leaderDataPath); + if (bytes.length < LOCKFILE_BUFFER_SIZE) { + // Data is incomplete or in a transient / corrupt state + return null; + } + + // Parse the cluster leader data + ByteBuffer buf = ByteBuffer.wrap(bytes); + byte[] uuidBytes = new byte[UUID_BYTE_LENGTH]; + buf.get(uuidBytes); + + String uuidStr = new String(uuidBytes, StandardCharsets.UTF_8); + long intervalNanos = buf.getLong(); + long lastHeartbeat = buf.getLong(); + + return new FileLockClusterLeaderInfo(uuidStr, intervalNanos, lastHeartbeat); + } + + /** + * Determines whether the current cluster leader is stale. Typically, when the leader has not updated the cluster + * lock file within acceptable bounds. + * + * @param latestClusterLeaderInfo The {@link FileLockClusterLeaderInfo} instance representing the latest cluster + * leader state + * @param previousClusterLeaderInfo The {@link FileLockClusterLeaderInfo} instance representing the previously + * recorded cluster leader state + * @param currentNanoTime The current time in nanoseconds, as returned by {@link System#nanoTime()} is + * held + * @return {@code true} if the leader is considered stale. {@code false} if the leader is + * still active + */ + static boolean isLeaderStale( + FileLockClusterLeaderInfo latestClusterLeaderInfo, + FileLockClusterLeaderInfo previousClusterLeaderInfo, + long currentNanoTime, + int heartbeatTimeoutMultiplier) { + + if (latestClusterLeaderInfo == null) { + return true; + } + + // Cluster leader changed since last observation so assume not stale + if (!latestClusterLeaderInfo.equals(previousClusterLeaderInfo)) { + return false; + } + + final long latestHeartbeat = latestClusterLeaderInfo.getHeartbeatNanoseconds(); + final long previousObservedHeartbeat = previousClusterLeaderInfo.getHeartbeatNanoseconds(); + + if (latestHeartbeat > previousObservedHeartbeat) { + // Not stale. Cluster leader is alive and updating the lock file + return false; + } + + if (latestHeartbeat < previousObservedHeartbeat) { + // Heartbeat somehow went backwards, maybe due to stale data + return true; + } + + // Check if cluster leader has updated the lock file within acceptable limits + final long elapsed = currentNanoTime - previousObservedHeartbeat; + final long heartbeatUpdateIntervalNanoseconds = latestClusterLeaderInfo.getHeartbeatUpdateIntervalNanoseconds(); + final long timeout = heartbeatUpdateIntervalNanoseconds * (long) heartbeatTimeoutMultiplier; + return elapsed > timeout; + } +} diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java index a2e9ad9dfd7..0fe775ae582 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java @@ -18,6 +18,7 @@ package org.apache.camel.component.file.cluster; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; import java.nio.file.Files; @@ -25,12 +26,13 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicReference; import org.apache.camel.cluster.CamelClusterMember; import org.apache.camel.support.cluster.AbstractCamelClusterView; @@ -39,20 +41,24 @@ import org.slf4j.LoggerFactory; public class FileLockClusterView extends AbstractCamelClusterView { private static final Logger LOGGER = LoggerFactory.getLogger(FileLockClusterView.class); - - private static final Lock LOCK = new ReentrantLock(); private final ClusterMember localMember; - private final Path path; - private RandomAccessFile lockFile; + private final Path leaderLockPath; + private final Path leaderDataPath; + private final AtomicReference<FileLockClusterLeaderInfo> clusterLeaderInfoRef = new AtomicReference<>(); + private RandomAccessFile leaderLockFile; + private RandomAccessFile leaderDataFile; private FileLock lock; private ScheduledFuture<?> task; + private int heartbeatTimeoutMultiplier; + private long acquireLockIntervalDelayNanoseconds; FileLockClusterView(FileLockClusterService cluster, String namespace) { super(cluster, namespace); + Objects.requireNonNull(cluster.getRoot(), "FileLockClusterService root directory must be specified"); this.localMember = new ClusterMember(); - this.path = Paths.get(cluster.getRoot(), namespace); - + this.leaderLockPath = Paths.get(cluster.getRoot(), namespace); + this.leaderDataPath = Paths.get(cluster.getRoot(), namespace + ".dat"); } @Override @@ -67,63 +73,105 @@ public class FileLockClusterView extends AbstractCamelClusterView { @Override public List<CamelClusterMember> getMembers() { - // It may be useful to lock only a region of the file an then have views + // It may be useful to lock only a region of the file and then have views // appending their id to the file on different regions so we can // have a list of members. Root/Header region that is used for locking - // purpose may also contains the lock holder. + // purpose may also contain the lock holder. return Collections.emptyList(); } @Override protected void doStart() throws Exception { - if (lockFile != null) { + if (leaderLockFile != null) { closeInternal(); - fireLeadershipChangedEvent((CamelClusterMember) null); } - if (!Files.exists(path.getParent())) { - Files.createDirectories(path.getParent()); + if (!Files.exists(leaderLockPath.getParent())) { + Files.createDirectories(leaderLockPath.getParent()); + } + + if (!Files.exists(leaderLockPath)) { + Files.createFile(leaderLockPath); + } + + if (!Files.exists(leaderDataPath)) { + Files.createFile(leaderDataPath); } FileLockClusterService service = getClusterService().unwrap(FileLockClusterService.class); - ScheduledExecutorService executor = service.getExecutor(); + acquireLockIntervalDelayNanoseconds = TimeUnit.NANOSECONDS.convert( + service.getAcquireLockInterval(), + service.getAcquireLockIntervalUnit()); - task = executor.scheduleAtFixedRate(this::tryLock, + heartbeatTimeoutMultiplier = service.getHeartbeatTimeoutMultiplier(); + if (heartbeatTimeoutMultiplier <= 0) { + throw new IllegalArgumentException("HeartbeatTimeoutMultiplier must be greater than 0"); + } + + ScheduledExecutorService executor = service.getExecutor(); + task = executor.scheduleWithFixedDelay(this::tryLock, TimeUnit.MILLISECONDS.convert(service.getAcquireLockDelay(), service.getAcquireLockDelayUnit()), TimeUnit.MILLISECONDS.convert(service.getAcquireLockInterval(), service.getAcquireLockIntervalUnit()), TimeUnit.MILLISECONDS); + + localMember.setStatus(ClusterMemberStatus.STARTED); } @Override protected void doStop() throws Exception { + if (localMember.isLeader() && leaderDataFile != null) { + try { + FileChannel channel = leaderDataFile.getChannel(); + channel.truncate(0); + channel.force(true); + } catch (Exception e) { + // Log and ignore since we need to release the file lock and do cleanup + LOGGER.debug("Failed to truncate {} on {} stop", leaderDataPath, getClass().getSimpleName(), e); + } + } + closeInternal(); + localMember.setStatus(ClusterMemberStatus.STOPPED); + clusterLeaderInfoRef.set(null); } - // ********************************* - // - // ********************************* - - private void closeInternal() throws Exception { + private void closeInternal() { if (task != null) { task.cancel(true); } - if (lock != null) { - lock.release(); + releaseFileLock(); + closeLockFiles(); + } + + private void closeLockFiles() { + if (leaderLockFile != null) { + try { + leaderLockFile.close(); + } catch (Exception ignore) { + // Ignore + } + leaderLockFile = null; } - closeLockFile(); + if (leaderDataFile != null) { + try { + leaderDataFile.close(); + } catch (Exception ignore) { + // Ignore + } + leaderDataFile = null; + } } - private void closeLockFile() { - if (lockFile != null) { + private void releaseFileLock() { + if (lock != null) { try { - lockFile.close(); + lock.release(); } catch (Exception ignore) { // Ignore } - lockFile = null; } } @@ -132,55 +180,132 @@ public class FileLockClusterView extends AbstractCamelClusterView { Exception reason = null; try { - if (localMember.isLeader()) { - LOGGER.trace("Holding the lock on file {} (lock={})", path, lock); + if (isLeaderInternal()) { + LOGGER.debug("Holding the lock on file {} (lock={}, cluster-member-id={})", leaderLockPath, lock, + localMember.getUuid()); + try { + // Update the cluster data file with the leader state so that other cluster members can interrogate it + writeClusterLeaderInfo(false); + return; + } catch (IOException e) { + LOGGER.debug("Failed writing cluster leader data to {}", leaderDataPath, e); + } + } + + // Non-null lock at this point signifies leadership has been lost or relinquished + if (lock != null) { + LOGGER.info("Lock on file {} lost (lock={}, cluster-member-id={})", leaderLockPath, lock, + localMember.getUuid()); + localMember.setStatus(ClusterMemberStatus.FOLLOWER); + releaseFileLock(); + closeLockFiles(); + lock = null; + fireLeadershipChangedEvent((CamelClusterMember) null); return; } - LOCK.lock(); - try { - if (lock != null) { - LOGGER.info("Lock on file {} lost (lock={})", path, lock); - fireLeadershipChangedEvent((CamelClusterMember) null); + // Must be follower to reach here + localMember.setStatus(ClusterMemberStatus.FOLLOWER); + + // Get & update cluster leader state + LOGGER.debug("Reading cluster leader state from {}", leaderDataPath); + FileLockClusterLeaderInfo latestClusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(leaderDataPath); + FileLockClusterLeaderInfo previousClusterLeaderInfo = clusterLeaderInfoRef.getAndSet(latestClusterLeaderInfo); + + // Check if we can attempt to take cluster leadership + if (isLeaderStale(latestClusterLeaderInfo, previousClusterLeaderInfo) + || canReclaimLeadership(latestClusterLeaderInfo)) { + if (previousClusterLeaderInfo != null && canReclaimLeadership(previousClusterLeaderInfo)) { + // Backoff so the current cluster leader can notice leadership is relinquished + return; } - LOGGER.debug("Try to acquire a lock on {}", path); - lockFile = new RandomAccessFile(path.toFile(), "rw"); + // Attempt to obtain cluster leadership + LOGGER.debug("Try to acquire a lock on {} (cluster-member-id={})", leaderLockPath, localMember.getUuid()); + leaderLockFile = new RandomAccessFile(leaderLockPath.toFile(), "rw"); + leaderDataFile = new RandomAccessFile(leaderDataPath.toFile(), "rw"); lock = null; - lock = lockFile.getChannel().tryLock(0, Math.max(1, lockFile.getChannel().size()), false); + if (Files.isReadable(leaderLockPath)) { + lock = leaderLockFile.getChannel().tryLock(0, Math.max(1, leaderLockFile.getChannel().size()), false); + } if (lock != null) { - LOGGER.info("Lock on file {} acquired (lock={})", path, lock); + LOGGER.info("Lock on file {} acquired (lock={}, cluster-member-id={})", leaderLockPath, lock, + localMember.getUuid()); + localMember.setStatus(ClusterMemberStatus.LEADER); + clusterLeaderInfoRef.set(null); fireLeadershipChangedEvent(localMember); + writeClusterLeaderInfo(true); } else { - LOGGER.debug("Lock on file {} not acquired ", path); + LOGGER.debug("Lock on file {} not acquired", leaderLockPath); } - } finally { - LOCK.unlock(); + } else { + LOGGER.debug("Existing cluster leader is valid. Retrying leadership acquisition on next interval"); } } catch (OverlappingFileLockException e) { reason = new IOException(e); } catch (Exception e) { reason = e; + } finally { + if (lock == null) { + LOGGER.debug("Lock on file {} not acquired (cluster-member-id={})", leaderLockPath, localMember.getUuid(), + reason); + closeLockFiles(); + } } + } + } + + boolean isLeaderStale(FileLockClusterLeaderInfo clusterLeaderInfo, FileLockClusterLeaderInfo previousClusterLeaderInfo) { + return FileLockClusterUtils.isLeaderStale( + clusterLeaderInfo, + previousClusterLeaderInfo, + System.nanoTime(), + heartbeatTimeoutMultiplier); + } + + boolean canReclaimLeadership(FileLockClusterLeaderInfo leaderInfo) { + return leaderInfo != null && localMember.getUuid().equals(leaderInfo.getId()); + } + + void writeClusterLeaderInfo(boolean forceMetaData) throws IOException { + FileLockClusterLeaderInfo latestClusterLeaderInfo = new FileLockClusterLeaderInfo( + localMember.getUuid(), + acquireLockIntervalDelayNanoseconds, + System.nanoTime()); - if (lock == null) { - LOGGER.debug("Lock on file {} not acquired ", path, reason); - closeLockFile(); + FileLockClusterUtils.writeClusterLeaderInfo( + leaderDataPath, + leaderDataFile.getChannel(), + latestClusterLeaderInfo, + forceMetaData); + } + + boolean isLeaderInternal() { + if (localMember.isLeader()) { + try { + FileLockClusterLeaderInfo leaderInfo = FileLockClusterUtils.readClusterLeaderInfo(leaderDataPath); + return lock != null + && lock.isValid() + && Files.exists(leaderLockPath) + && leaderInfo != null + && localMember.getUuid().equals(leaderInfo.getId()); + } catch (Exception e) { + LOGGER.debug("Failed to read {} (cluster-member-id={})", leaderLockPath, localMember.getUuid(), e); + return false; } } + return false; } private final class ClusterMember implements CamelClusterMember { + private final AtomicReference<ClusterMemberStatus> status = new AtomicReference<>(ClusterMemberStatus.STOPPED); + private final String uuid = UUID.randomUUID().toString(); + @Override public boolean isLeader() { - LOCK.lock(); - try { - return lock != null && lock.isValid(); - } finally { - LOCK.unlock(); - } + return getStatus().equals(ClusterMemberStatus.LEADER); } @Override @@ -192,5 +317,24 @@ public class FileLockClusterView extends AbstractCamelClusterView { public String getId() { return getClusterService().getId(); } + + public String getUuid() { + return uuid; + } + + public ClusterMemberStatus getStatus() { + return status.get(); + } + + private void setStatus(ClusterMemberStatus status) { + this.status.set(status); + } + } + + private enum ClusterMemberStatus { + FOLLOWER, + LEADER, + STARTED, + STOPPED } } diff --git a/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java new file mode 100644 index 00000000000..072ce576cd6 --- /dev/null +++ b/components/camel-file/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterUtilsTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.cluster; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import static org.apache.camel.component.file.cluster.FileLockClusterUtils.LOCKFILE_BUFFER_SIZE; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class FileLockClusterUtilsTest { + @Test + void nullLeaderInfoIsStale() { + assertTrue(FileLockClusterUtils.isLeaderStale(null, null, System.nanoTime(), 5)); + } + + @Test + void newHeartbeatNotStale() { + String clusterMemberId = UUID.randomUUID().toString(); + FileLockClusterLeaderInfo previousClusterLeaderInfo = new FileLockClusterLeaderInfo( + clusterMemberId, + TimeUnit.SECONDS.toNanos(1), + System.nanoTime()); + + FileLockClusterLeaderInfo latestClusterLeaderInfo = new FileLockClusterLeaderInfo( + clusterMemberId, + TimeUnit.SECONDS.toNanos(1), + System.nanoTime()); + + assertFalse( + FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo, previousClusterLeaderInfo, System.nanoTime(), 5)); + } + + @Test + void sameHeartbeatIsStale() { + String clusterMemberId = UUID.randomUUID().toString(); + long heartbeatNanoseconds = System.nanoTime() - TimeUnit.SECONDS.toNanos(10); + FileLockClusterLeaderInfo previousClusterLeaderInfo = new FileLockClusterLeaderInfo( + clusterMemberId, + TimeUnit.SECONDS.toNanos(1), + heartbeatNanoseconds); + + FileLockClusterLeaderInfo latestClusterLeaderInfo = new FileLockClusterLeaderInfo( + clusterMemberId, + TimeUnit.SECONDS.toNanos(1), + heartbeatNanoseconds); + + assertTrue( + FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo, previousClusterLeaderInfo, System.nanoTime(), 3)); + } + + @Test + void oldHeartbeatStale() { + String clusterMemberId = UUID.randomUUID().toString(); + FileLockClusterLeaderInfo previousClusterLeaderInfo = new FileLockClusterLeaderInfo( + clusterMemberId, + TimeUnit.SECONDS.toNanos(1), + System.nanoTime() - TimeUnit.SECONDS.toNanos(5)); + + FileLockClusterLeaderInfo latestClusterLeaderInfo = new FileLockClusterLeaderInfo( + clusterMemberId, + TimeUnit.SECONDS.toNanos(1), + System.nanoTime() - TimeUnit.SECONDS.toNanos(10)); + + assertTrue( + FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo, previousClusterLeaderInfo, System.nanoTime(), 3)); + } + + @Test + void heartbeatExactlyAtThreshold() { + int heartbeatMultiplier = 3; + long now = System.nanoTime(); + long updateInterval = TimeUnit.SECONDS.toNanos(1); + long heartbeat = now - (updateInterval * heartbeatMultiplier); + + String clusterMemberId = UUID.randomUUID().toString(); + FileLockClusterLeaderInfo previousClusterLeaderInfo = new FileLockClusterLeaderInfo( + clusterMemberId, + TimeUnit.SECONDS.toNanos(1), + heartbeat); + + FileLockClusterLeaderInfo latestClusterLeaderInfo = new FileLockClusterLeaderInfo( + clusterMemberId, + TimeUnit.SECONDS.toNanos(1), + heartbeat); + + assertFalse(FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo, previousClusterLeaderInfo, now, + heartbeatMultiplier)); + } + + @Test + void leaderChangedNotStale() { + FileLockClusterLeaderInfo previousClusterLeaderInfo = new FileLockClusterLeaderInfo( + UUID.randomUUID().toString(), + TimeUnit.SECONDS.toNanos(1), + System.nanoTime()); + + FileLockClusterLeaderInfo latestClusterLeaderInfo = new FileLockClusterLeaderInfo( + UUID.randomUUID().toString(), + TimeUnit.SECONDS.toNanos(1), + System.nanoTime()); + + assertFalse( + FileLockClusterUtils.isLeaderStale(latestClusterLeaderInfo, previousClusterLeaderInfo, System.nanoTime(), 3)); + } + + @Test + void expectedFileLockBufferSize() { + // To catch cases where the lock file format is modified but the buffer size was not updated + assertEquals(52, LOCKFILE_BUFFER_SIZE); + } + + @Test + void writeClusterLeaderInfoLockNullChannel() { + assertThrows(NullPointerException.class, () -> { + FileLockClusterUtils.writeClusterLeaderInfo(Paths.get("."), null, new FileLockClusterLeaderInfo("", 1L, 1L), true); + }); + } + + @Test + void writeClusterLeaderInfoWithNullData(@TempDir Path tempDir) { + assertThrows(NullPointerException.class, () -> { + try (RandomAccessFile raf = new RandomAccessFile(tempDir.resolve("lock").toFile(), "rw")) { + FileLockClusterUtils.writeClusterLeaderInfo(Paths.get("."), raf.getChannel(), null, true); + } + }); + } + + @Test + void writeClusterLeaderInfoClusterDataFileNotFound(@TempDir Path tempDir) { + assertThrows(FileNotFoundException.class, () -> { + try (RandomAccessFile raf = new RandomAccessFile(tempDir.resolve("leader.dat").toFile(), "rw")) { + FileLockClusterLeaderInfo leaderInfo = new FileLockClusterLeaderInfo(UUID.randomUUID().toString(), 1L, 1L); + FileLockClusterUtils.writeClusterLeaderInfo(Paths.get("/invalid/data/file"), raf.getChannel(), leaderInfo, + true); + } + }); + } + + @Test + void writeClusterLeaderInfoData(@TempDir Path tempDir) throws IOException { + Path clusterData = tempDir.resolve("leader.dat"); + try (RandomAccessFile raf = new RandomAccessFile(clusterData.toFile(), "rw")) { + FileLockClusterLeaderInfo leaderInfo = new FileLockClusterLeaderInfo(UUID.randomUUID().toString(), 1L, 2L); + FileLockClusterUtils.writeClusterLeaderInfo(clusterData, raf.getChannel(), leaderInfo, true); + assertEquals(LOCKFILE_BUFFER_SIZE, Files.size(clusterData)); + } + } + + @Test + void readClusterLeaderInfoLockFileNotFound() throws Exception { + assertNull(FileLockClusterUtils.readClusterLeaderInfo(Paths.get("/invalid/data/file"))); + } + + @Test + void readClusterLeaderInfoLock(@TempDir Path tempDir) throws Exception { + writeClusterLeaderInfoData(tempDir); + + Path lockFile = tempDir.resolve("leader.dat"); + FileLockClusterLeaderInfo clusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(lockFile); + assertNotNull(clusterLeaderInfo); + + assertEquals(1L, clusterLeaderInfo.getHeartbeatUpdateIntervalNanoseconds()); + assertEquals(2L, clusterLeaderInfo.getHeartbeatNanoseconds()); + assertDoesNotThrow(() -> UUID.fromString(clusterLeaderInfo.getId())); + } +} diff --git a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java new file mode 100644 index 00000000000..e7746679d04 --- /dev/null +++ b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.cluster; + +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.camel.CamelContext; +import org.apache.camel.component.mock.MockEndpoint; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; +import org.junit.jupiter.api.io.TempDir; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@DisabledOnOs(value = OS.WINDOWS, disabledReason = "Platform file locking impl prevents cluster data move / deletion") +class FileLockClusterServiceAdvancedFailoverTest extends FileLockClusterServiceTestBase { + @Test + void singleClusterMemberRecoversLeadershipIfLockFileDeleted() throws Exception { + ClusterConfig config = new ClusterConfig(); + config.setTimerRepeatCount(-1); + + try (CamelContext clusterLeader = createCamelContext()) { + MockEndpoint mockEndpoint = clusterLeader.getEndpoint("mock:result", MockEndpoint.class); + mockEndpoint.expectedMinimumMessageCount(1); + + clusterLeader.start(); + + mockEndpoint.assertIsSatisfied(); + + AtomicReference<String> leaderId = new AtomicReference<>(); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertTrue(Files.exists(lockFile)); + assertTrue(Files.exists(dataFile)); + assertTrue(getClusterMember(clusterLeader).isLeader()); + + FileLockClusterLeaderInfo clusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(dataFile); + assertNotNull(clusterLeaderInfo); + + assertNotNull(clusterLeaderInfo.getId()); + assertDoesNotThrow(() -> UUID.fromString(clusterLeaderInfo.getId())); + leaderId.set(clusterLeaderInfo.getId()); + }); + + // Delete the lock file + Files.deleteIfExists(lockFile); + + mockEndpoint.reset(); + mockEndpoint.expectedMinimumMessageCount(1); + + // Wait for leadership to be relinquished + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertFalse(getClusterMember(clusterLeader).isLeader()); + }); + + // Leadership should be retained + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertTrue(getClusterMember(clusterLeader).isLeader()); + + FileLockClusterLeaderInfo recoveredClusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(dataFile); + assertNotNull(recoveredClusterLeaderInfo); + + String recoveredLeaderId = recoveredClusterLeaderInfo.getId(); + assertNotNull(recoveredLeaderId); + assertDoesNotThrow(() -> UUID.fromString(recoveredLeaderId)); + assertEquals(leaderId.get(), recoveredLeaderId); + + mockEndpoint.assertIsSatisfied(); + }); + } + + String leaderId = Files.readString(dataFile); + assertTrue(leaderId.isEmpty()); + } + + @Test + void multipleClusterMembersReelectLeaderIfLockFileDeleted() throws Exception { + ClusterConfig leaderConfig = new ClusterConfig(); + leaderConfig.setTimerRepeatCount(-1); + + CamelContext clusterLeader = createCamelContext(leaderConfig); + + ClusterConfig followerConfig = new ClusterConfig(); + followerConfig.setTimerRepeatCount(-1); + followerConfig.setAcquireLockDelay(2); + + CamelContext clusterFollower = createCamelContext(followerConfig); + + try { + MockEndpoint mockEndpointLeader = clusterLeader.getEndpoint("mock:result", MockEndpoint.class); + mockEndpointLeader.expectedMinimumMessageCount(1); + + clusterLeader.start(); + clusterFollower.start(); + + mockEndpointLeader.assertIsSatisfied(); + + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertTrue(Files.exists(lockFile)); + assertTrue(Files.exists(dataFile)); + assertTrue(getClusterMember(clusterLeader).isLeader()); + + FileLockClusterLeaderInfo clusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(dataFile); + assertNotNull(clusterLeaderInfo); + + String leaderId = clusterLeaderInfo.getId(); + assertNotNull(leaderId); + assertDoesNotThrow(() -> UUID.fromString(leaderId)); + }); + + // Wait enough time for the follower to have run its lock acquisition scheduled task + Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis()); + + // The follower should not have produced any messages + MockEndpoint mockEndpointFollower = clusterFollower.getEndpoint("mock:result", MockEndpoint.class); + assertTrue(mockEndpointFollower.getExchanges().isEmpty()); + + mockEndpointLeader.reset(); + mockEndpointLeader.expectedMinimumMessageCount(1); + + // Delete the lock file + Files.deleteIfExists(lockFile); + + // Wait for leadership to be relinquished + Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { + assertFalse(getClusterMember(clusterLeader).isLeader()); + }); + + // Wait for leadership to be gained by one of the members + CamelContext oldLeader = clusterLeader; + Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { + boolean newLeaderElected = false; + + // Original cluster leader regained leadership + if (getClusterMember(oldLeader).isLeader()) { + newLeaderElected = true; + mockEndpointLeader.assertIsSatisfied(); + } + + // A different cluster member gained leadership + if (getClusterMember(clusterFollower).isLeader()) { + newLeaderElected = true; + mockEndpointFollower.assertIsSatisfied(); + } + + assertTrue(newLeaderElected); + }); + } finally { + clusterLeader.stop(); + clusterFollower.stop(); + } + } + + @Test + void notStaleLockFileForRestoredFileSystemElectsOriginalLeader(@TempDir Path clusterMovedLocation) throws Exception { + ClusterConfig leaderConfig = new ClusterConfig(); + leaderConfig.setTimerRepeatCount(-1); + + CamelContext clusterLeader = createCamelContext(leaderConfig); + + ClusterConfig followerConfig = new ClusterConfig(); + followerConfig.setTimerRepeatCount(-1); + followerConfig.setAcquireLockDelay(2); + + CamelContext clusterFollower = createCamelContext(followerConfig); + + try { + MockEndpoint mockEndpointLeader = clusterLeader.getEndpoint("mock:result", MockEndpoint.class); + mockEndpointLeader.expectedMessageCount(5); + + clusterLeader.start(); + clusterFollower.start(); + + mockEndpointLeader.assertIsSatisfied(); + + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertTrue(Files.exists(lockFile)); + assertTrue(Files.exists(dataFile)); + assertTrue(getClusterMember(clusterLeader).isLeader()); + + FileLockClusterLeaderInfo clusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(dataFile); + assertNotNull(clusterLeaderInfo); + + String leaderId = clusterLeaderInfo.getId(); + assertNotNull(leaderId); + assertDoesNotThrow(() -> UUID.fromString(leaderId)); + }); + + // Wait enough time for the follower to have run its lock acquisition scheduled task + Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis()); + + // The follower should not have produced any messages + MockEndpoint mockEndpointFollower = clusterFollower.getEndpoint("mock:result", MockEndpoint.class); + assertTrue(mockEndpointFollower.getExchanges().isEmpty()); + + mockEndpointLeader.reset(); + mockEndpointLeader.expectedMinimumMessageCount(1); + + // Simulate the file system becoming detached by moving the cluster data directory + Files.move(clusterDir, clusterMovedLocation, StandardCopyOption.REPLACE_EXISTING); + + // Wait for leadership to be relinquished + Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { + assertFalse(getClusterMember(clusterLeader).isLeader()); + }); + + // Simulate reattaching the file system by moving the cluster directory back to the original location + Files.move(clusterMovedLocation, clusterDir, StandardCopyOption.REPLACE_EXISTING); + + // Since the lock file is not considered 'stale', the original leader should resume leadership + Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { + assertTrue(getClusterMember(clusterLeader).isLeader()); + mockEndpointLeader.assertIsSatisfied(); + }); + + assertTrue(mockEndpointFollower.getExchanges().isEmpty()); + } finally { + clusterLeader.stop(); + clusterFollower.stop(); + } + } + + @Test + void staleLockFileForRestoredFileSystemElectsNewLeader(@TempDir Path clusterMovedLocation) throws Exception { + ClusterConfig leaderConfig = new ClusterConfig(); + leaderConfig.setTimerRepeatCount(-1); + + CamelContext clusterLeader = createCamelContext(leaderConfig); + + ClusterConfig followerConfig = new ClusterConfig(); + followerConfig.setTimerRepeatCount(-1); + followerConfig.setAcquireLockDelay(2); + + CamelContext clusterFollower = createCamelContext(followerConfig); + + try { + MockEndpoint mockEndpointLeader = clusterLeader.getEndpoint("mock:result", MockEndpoint.class); + mockEndpointLeader.expectedMessageCount(5); + + clusterLeader.start(); + clusterFollower.start(); + + mockEndpointLeader.assertIsSatisfied(); + + AtomicReference<FileLockClusterLeaderInfo> leaderInfo = new AtomicReference<>(); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertTrue(Files.exists(lockFile)); + assertTrue(Files.exists(dataFile)); + assertTrue(getClusterMember(clusterLeader).isLeader()); + + FileLockClusterLeaderInfo clusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(dataFile); + assertNotNull(clusterLeaderInfo); + leaderInfo.set(clusterLeaderInfo); + + String leaderId = clusterLeaderInfo.getId(); + assertNotNull(leaderId); + assertDoesNotThrow(() -> UUID.fromString(leaderId)); + }); + + // Wait enough time for the follower to have run its lock acquisition scheduled task + Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis()); + + // The follower should not have produced any messages + MockEndpoint mockEndpointFollower = clusterFollower.getEndpoint("mock:result", MockEndpoint.class); + assertTrue(mockEndpointFollower.getExchanges().isEmpty()); + + mockEndpointLeader.reset(); + mockEndpointLeader.expectedMinimumMessageCount(1); + + // Simulate the file system becoming detached by moving the lock directory + Files.move(clusterDir, clusterMovedLocation, StandardCopyOption.REPLACE_EXISTING); + + // Wait for leadership to be relinquished + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertFalse(getClusterMember(clusterLeader).isLeader()); + }); + + // Stop the cluster leader to simulate it going 'offline' while the lock file system is detached + clusterLeader.stop(); + + // Make the cluster data file appear stale (i.e. not updated within acceptable bounds) + long staleHeartbeatTimestamp = leaderInfo.get().getHeartbeatNanoseconds() - TimeUnit.SECONDS.toNanos(100); + + FileLockClusterLeaderInfo updatedInfo + = new FileLockClusterLeaderInfo( + leaderInfo.get().getId(), TimeUnit.NANOSECONDS.toNanos(2), staleHeartbeatTimestamp); + Path data = clusterMovedLocation.resolve(NAMESPACE + ".data"); + try (RandomAccessFile file = new RandomAccessFile(data.toFile(), "rw")) { + FileLockClusterUtils.writeClusterLeaderInfo(data, file.getChannel(), updatedInfo, + true); + } + + // Simulate reattaching the file system by moving the cluster directory back to the original location + Files.move(clusterMovedLocation, clusterDir, StandardCopyOption.REPLACE_EXISTING); + + mockEndpointFollower.expectedMinimumMessageCount(1); + + // Since the lock file is considered 'stale', the follower should be elected the leader + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertTrue(getClusterMember(clusterFollower).isLeader()); + mockEndpointFollower.assertIsSatisfied(); + }); + } finally { + clusterFollower.stop(); + } + } +} diff --git a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java new file mode 100644 index 00000000000..72b49bc1b48 --- /dev/null +++ b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceBasicFailoverTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.cluster; + +import java.nio.file.Files; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.camel.CamelContext; +import org.apache.camel.component.mock.MockEndpoint; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; + +import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class FileLockClusterServiceBasicFailoverTest extends FileLockClusterServiceTestBase { + @Test + void singleClusterMemberLeaderElection() throws Exception { + try (CamelContext clusterLeader = createCamelContext()) { + MockEndpoint mockEndpoint = clusterLeader.getEndpoint("mock:result", MockEndpoint.class); + mockEndpoint.expectedMessageCount(5); + + clusterLeader.start(); + + mockEndpoint.assertIsSatisfied(); + + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertTrue(Files.exists(lockFile)); + assertTrue(Files.exists(dataFile)); + + FileLockClusterLeaderInfo clusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(dataFile); + assertNotNull(clusterLeaderInfo); + + String leaderId = clusterLeaderInfo.getId(); + assertNotNull(leaderId); + assertDoesNotThrow(() -> UUID.fromString(leaderId)); + }); + } + + assertEquals(0, Files.size(dataFile)); + } + + @Test + void multiClusterMemberLeaderElection() throws Exception { + CamelContext clusterLeader = createCamelContext(); + + ClusterConfig followerConfig = new ClusterConfig(); + followerConfig.setAcquireLockDelay(2); + CamelContext clusterFollower = createCamelContext(followerConfig); + + try { + MockEndpoint mockEndpointClustered = clusterLeader.getEndpoint("mock:result", MockEndpoint.class); + mockEndpointClustered.expectedMessageCount(5); + + clusterLeader.start(); + clusterFollower.start(); + + mockEndpointClustered.assertIsSatisfied(); + + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertTrue(Files.exists(lockFile)); + assertTrue(Files.exists(dataFile)); + assertTrue(getClusterMember(clusterLeader).isLeader()); + + FileLockClusterLeaderInfo clusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(dataFile); + assertNotNull(clusterLeaderInfo); + + String leaderId = clusterLeaderInfo.getId(); + assertNotNull(leaderId); + assertDoesNotThrow(() -> UUID.fromString(leaderId)); + }); + + // Wait enough time for the follower to have run its lock acquisition scheduled task + Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis()); + + // The follower should not have produced any messages + MockEndpoint mockEndpointFollower = clusterFollower.getEndpoint("mock:result", MockEndpoint.class); + assertTrue(mockEndpointFollower.getExchanges().isEmpty()); + } finally { + clusterFollower.stop(); + clusterLeader.stop(); + } + + assertEquals(0, Files.size(dataFile)); + } + + @Test + void clusterFailoverWhenLeaderCamelContextStopped() throws Exception { + CamelContext clusterLeader = createCamelContext(); + + ClusterConfig followerConfig = new ClusterConfig(); + followerConfig.setAcquireLockDelay(2); + CamelContext clusterFollower = createCamelContext(followerConfig); + + try { + MockEndpoint mockEndpointClustered = clusterLeader.getEndpoint("mock:result", MockEndpoint.class); + mockEndpointClustered.expectedMessageCount(5); + + clusterLeader.start(); + clusterFollower.start(); + + mockEndpointClustered.assertIsSatisfied(); + + AtomicReference<String> leaderId = new AtomicReference<>(); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertTrue(Files.exists(lockFile)); + assertTrue(Files.exists(dataFile)); + assertTrue(getClusterMember(clusterLeader).isLeader()); + + FileLockClusterLeaderInfo clusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(dataFile); + assertNotNull(clusterLeaderInfo); + + assertNotNull(clusterLeaderInfo.getId()); + assertDoesNotThrow(() -> UUID.fromString(clusterLeaderInfo.getId())); + leaderId.set(clusterLeaderInfo.getId()); + }); + + // Wait enough time for the follower to have run its lock acquisition scheduled task + Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis()); + + // The follower should not have produced any messages + MockEndpoint mockEndpointFollower = clusterFollower.getEndpoint("mock:result", MockEndpoint.class); + assertTrue(mockEndpointFollower.getExchanges().isEmpty()); + + // Stop the cluster leader + clusterLeader.stop(); + + // Verify the follower was elected as the new cluster leader + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertTrue(getClusterMember(clusterFollower).isLeader()); + + FileLockClusterLeaderInfo updatedClusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(dataFile); + assertNotNull(updatedClusterLeaderInfo); + + String newLeaderId = updatedClusterLeaderInfo.getId(); + assertNotNull(newLeaderId); + assertDoesNotThrow(() -> UUID.fromString(newLeaderId)); + assertNotEquals(leaderId.get(), newLeaderId); + assertEquals(5, mockEndpointFollower.getExchanges().size()); + }); + } finally { + clusterFollower.stop(); + } + + assertEquals(0, Files.size(dataFile)); + } + + @Test + void singleClusterMemberRecoversLeadershipIfUUIDRemovedFromLockFile() throws Exception { + try (CamelContext clusterLeader = createCamelContext()) { + MockEndpoint mockEndpoint = clusterLeader.getEndpoint("mock:result", MockEndpoint.class); + mockEndpoint.expectedMinimumMessageCount(1); + + clusterLeader.start(); + + mockEndpoint.assertIsSatisfied(); + + AtomicReference<String> leaderId = new AtomicReference<>(); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertTrue(Files.exists(lockFile)); + assertTrue(Files.exists(dataFile)); + assertTrue(getClusterMember(clusterLeader).isLeader()); + + FileLockClusterLeaderInfo clusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(dataFile); + assertNotNull(clusterLeaderInfo); + + assertNotNull(clusterLeaderInfo.getId()); + assertDoesNotThrow(() -> UUID.fromString(clusterLeaderInfo.getId())); + leaderId.set(clusterLeaderInfo.getId()); + }); + + // Truncate the lock file + Files.write(dataFile, new byte[0]); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + // Leadership should be lost + assertFalse(getClusterMember(clusterLeader).isLeader()); + }); + + mockEndpoint.reset(); + mockEndpoint.expectedMinimumMessageCount(1); + + // Await recovery + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertTrue(getClusterMember(clusterLeader).isLeader()); + + FileLockClusterLeaderInfo recoveredClusterLeaderInfo = FileLockClusterUtils.readClusterLeaderInfo(dataFile); + assertNotNull(recoveredClusterLeaderInfo); + + String recoveredLeaderId = recoveredClusterLeaderInfo.getId(); + assertNotNull(recoveredLeaderId); + assertDoesNotThrow(() -> UUID.fromString(recoveredLeaderId)); + assertEquals(leaderId.get(), recoveredLeaderId); + + mockEndpoint.assertIsSatisfied(); + }); + } + + assertEquals(0, Files.size(dataFile)); + } + + @Test + void negativeHeartbeatTimeoutMultiplierThrowsException() throws Exception { + ClusterConfig config = new ClusterConfig(); + config.setHeartbeatTimeoutMultiplier(-1); + + Exception exception = assertThrows(Exception.class, () -> { + try (CamelContext camelContext = createCamelContext(config)) { + camelContext.start(); + } + }); + assertIsInstanceOf(IllegalArgumentException.class, exception.getCause()); + } + + @Test + void zeroHeartbeatTimeoutMultiplierThrowsException() throws Exception { + ClusterConfig config = new ClusterConfig(); + config.setHeartbeatTimeoutMultiplier(0); + + Exception exception = assertThrows(Exception.class, () -> { + try (CamelContext camelContext = createCamelContext(config)) { + camelContext.start(); + } + }); + assertIsInstanceOf(IllegalArgumentException.class, exception.getCause()); + } +} diff --git a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java new file mode 100644 index 00000000000..72290aaacbb --- /dev/null +++ b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceTestBase.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.cluster; + +import java.nio.file.Path; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.LoggingLevel; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.cluster.CamelClusterMember; +import org.apache.camel.cluster.CamelClusterView; +import org.apache.camel.impl.DefaultCamelContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; + +abstract class FileLockClusterServiceTestBase { + protected static final String NAMESPACE = "test-ns"; + + @TempDir + protected Path clusterDir; + protected Path lockFile; + protected Path dataFile; + + @BeforeEach + public void beforeEach() { + lockFile = clusterDir.resolve(NAMESPACE); + dataFile = clusterDir.resolve(NAMESPACE + ".dat"); + } + + protected CamelContext createCamelContext() throws Exception { + return createCamelContext(new ClusterConfig()); + } + + protected CamelContext createCamelContext(ClusterConfig config) throws Exception { + CamelContext context = new DefaultCamelContext(); + context.addService(createFileLockClusterService(config)); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + fromF("master:%s:timer:clustered?delay=1&period=100&repeatCount=%d", NAMESPACE, config.getTimerRepeatCount()) + .routeId("clustered") + .log(LoggingLevel.DEBUG, "Timer fired for ${camelId}") + .to("mock:result"); + } + }); + return context; + } + + protected FileLockClusterService createFileLockClusterService(ClusterConfig config) { + FileLockClusterService service = new FileLockClusterService(); + service.setAcquireLockDelay(config.getAcquireLockDelay()); + service.setAcquireLockInterval(1); + service.setRoot(clusterDir.toString()); + service.setHeartbeatTimeoutMultiplier(config.getHeartbeatTimeoutMultiplier()); + return service; + } + + protected CamelClusterMember getClusterMember(CamelContext camelContext) throws Exception { + return getClusterView(camelContext).getLocalMember(); + } + + protected CamelClusterView getClusterView(CamelContext camelContext) throws Exception { + FileLockClusterService fileLockClusterService = camelContext.hasService(FileLockClusterService.class); + return fileLockClusterService.getView(NAMESPACE); + } + + static final class ClusterConfig { + private long acquireLockDelay = 1; + private long timerRepeatCount = 5; + private int heartbeatTimeoutMultiplier = 5; + + long getAcquireLockDelay() { + return acquireLockDelay; + } + + void setAcquireLockDelay(long acquireLockDelay) { + this.acquireLockDelay = acquireLockDelay; + } + + long getTimerRepeatCount() { + return timerRepeatCount; + } + + void setTimerRepeatCount(long timerRepeatCount) { + this.timerRepeatCount = timerRepeatCount; + } + + long getStartupDelayWithOffsetMillis() { + return TimeUnit.SECONDS.toMillis(getAcquireLockDelay()) + 500; + } + + public int getHeartbeatTimeoutMultiplier() { + return heartbeatTimeoutMultiplier; + } + + public void setHeartbeatTimeoutMultiplier(int heartbeatTimeoutMultiplier) { + this.heartbeatTimeoutMultiplier = heartbeatTimeoutMultiplier; + } + } +}
