This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new cec9957294cf CAMEL-22784: Recreate file lock cluster directory if
deletion detected
cec9957294cf is described below
commit cec9957294cf6318f436d3075cad7e55e23ae1e7
Author: James Netherton <[email protected]>
AuthorDate: Tue Dec 16 07:00:42 2025 +0000
CAMEL-22784: Recreate file lock cluster directory if deletion detected
---
.../file/cluster/FileLockClusterView.java | 5 +
...FileLockClusterServiceAdvancedFailoverTest.java | 127 +++++++++++++++++++--
2 files changed, 125 insertions(+), 7 deletions(-)
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
index 64701a750b85..5ce7c554ebc7 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
@@ -246,6 +246,11 @@ public class FileLockClusterView extends
AbstractCamelClusterView {
return;
}
+ // Try to recreate the cluster data directory in case it
got removed
+ if (!Files.exists(leaderLockPath.getParent())) {
+ Files.createDirectories(leaderLockPath.getParent());
+ }
+
// Attempt to obtain cluster leadership
LOGGER.debug("Try to acquire a lock on {}
(cluster-member-id={})", leaderLockPath, localMember.getUuid());
leaderLockFile = new
RandomAccessFile(leaderLockPath.toFile(), "rw");
diff --git
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
index 515cee417a84..c335681eca85 100644
---
a/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
+++
b/components/camel-master/src/test/java/org/apache/camel/component/file/cluster/FileLockClusterServiceAdvancedFailoverTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.file.cluster;
+import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -24,9 +25,11 @@ import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
import org.apache.camel.CamelContext;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.FileUtil;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
@@ -178,7 +181,7 @@ class FileLockClusterServiceAdvancedFailoverTest extends
FileLockClusterServiceT
}
@Test
- void notStaleLockFileForRestoredFileSystemElectsOriginalLeader(@TempDir
Path clusterMovedLocation) throws Exception {
+ void multipleClusterMembersReelectLeaderIfClusterDataDirectoryDeleted()
throws Exception {
ClusterConfig leaderConfig = new ClusterConfig();
leaderConfig.setTimerRepeatCount(-1);
@@ -192,7 +195,7 @@ class FileLockClusterServiceAdvancedFailoverTest extends
FileLockClusterServiceT
try {
MockEndpoint mockEndpointLeader =
clusterLeader.getEndpoint("mock:result", MockEndpoint.class);
- mockEndpointLeader.expectedMessageCount(5);
+ mockEndpointLeader.expectedMinimumMessageCount(1);
clusterLeader.start();
clusterFollower.start();
@@ -222,16 +225,113 @@ class FileLockClusterServiceAdvancedFailoverTest extends
FileLockClusterServiceT
mockEndpointLeader.reset();
mockEndpointLeader.expectedMinimumMessageCount(1);
- // Simulate the file system becoming detached by moving the
cluster data directory
- Files.move(clusterDir, clusterMovedLocation,
StandardCopyOption.REPLACE_EXISTING);
+ // Delete the cluster data directory
+ FileUtil.removeDir(clusterDir.toFile());
// Wait for leadership to be relinquished
Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(()
-> {
assertFalse(getClusterMember(clusterLeader).isLeader());
});
+ // Wait for leadership to be gained by one of the members
+ CamelContext oldLeader = clusterLeader;
+ Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(()
-> {
+ boolean newLeaderElected = false;
+
+ // Original cluster leader regained leadership
+ if (getClusterMember(oldLeader).isLeader()) {
+ newLeaderElected = true;
+ mockEndpointLeader.assertIsSatisfied();
+ }
+
+ // A different cluster member gained leadership
+ if (getClusterMember(clusterFollower).isLeader()) {
+ newLeaderElected = true;
+ mockEndpointFollower.assertIsSatisfied();
+ }
+
+ assertTrue(newLeaderElected);
+ });
+ } finally {
+ clusterLeader.stop();
+ clusterFollower.stop();
+ }
+ }
+
+ @Test
+ void notStaleLockFileForRestoredFileSystemElectsOriginalLeader(@TempDir
Path clusterMovedLocation) throws Exception {
+ ClusterConfig leaderConfig = new ClusterConfig();
+ leaderConfig.setTimerRepeatCount(-1);
+
+ CamelContext clusterLeader = createCamelContext(leaderConfig);
+
+ ClusterConfig followerConfig = new ClusterConfig();
+ followerConfig.setTimerRepeatCount(-1);
+ followerConfig.setAcquireLockDelay(2);
+
+ CamelContext clusterFollower = createCamelContext(followerConfig);
+
+ try {
+ MockEndpoint mockEndpointLeader =
clusterLeader.getEndpoint("mock:result", MockEndpoint.class);
+ mockEndpointLeader.expectedMessageCount(5);
+
+ clusterLeader.start();
+ clusterFollower.start();
+
+ mockEndpointLeader.assertIsSatisfied();
+
+ AtomicReference<FileLockClusterLeaderInfo> leaderInfo = new
AtomicReference<>();
+ Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(()
-> {
+ assertTrue(Files.exists(lockFile));
+ assertTrue(Files.exists(dataFile));
+ assertTrue(getClusterMember(clusterLeader).isLeader());
+
+ FileLockClusterLeaderInfo clusterLeaderInfo =
FileLockClusterUtils.readClusterLeaderInfo(dataFile);
+ assertNotNull(clusterLeaderInfo);
+ leaderInfo.set(clusterLeaderInfo);
+
+ String leaderId = clusterLeaderInfo.getId();
+ assertNotNull(leaderId);
+ assertDoesNotThrow(() -> UUID.fromString(leaderId));
+ });
+
+ // Wait enough time for the follower to have run its lock
acquisition scheduled task
+ Thread.sleep(followerConfig.getStartupDelayWithOffsetMillis());
+
+ // The follower should not have produced any messages
+ MockEndpoint mockEndpointFollower =
clusterFollower.getEndpoint("mock:result", MockEndpoint.class);
+ assertTrue(mockEndpointFollower.getExchanges().isEmpty());
+
+ mockEndpointLeader.reset();
+ mockEndpointLeader.expectedMinimumMessageCount(1);
+
+ // Simulate the file system becoming detached by moving the
cluster data directory
+ Files.move(clusterDir, clusterMovedLocation,
StandardCopyOption.REPLACE_EXISTING);
+
// Simulate reattaching the file system by moving the cluster
directory back to the original location
- Files.move(clusterMovedLocation, clusterDir,
StandardCopyOption.REPLACE_EXISTING);
+ try (Stream<Path> stream = Files.walk(clusterMovedLocation)) {
+ stream.forEach(path -> {
+ try {
+ Path destination =
clusterDir.resolve(clusterMovedLocation.relativize(path));
+ if (Files.isDirectory(path)) {
+ Files.createDirectories(destination);
+ } else {
+ Files.copy(path, destination,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ FileLockClusterLeaderInfo updatedInfo
+ = new FileLockClusterLeaderInfo(
+ leaderInfo.get().getId(),
TimeUnit.MILLISECONDS.toMillis(2), System.currentTimeMillis());
+ Path data = clusterMovedLocation.resolve(NAMESPACE + ".data");
+ try (RandomAccessFile file = new RandomAccessFile(data.toFile(),
"rw")) {
+ FileLockClusterUtils.writeClusterLeaderInfo(data,
file.getChannel(), updatedInfo,
+ true);
+ }
// Since the lock file is not considered 'stale', the original
leader should resume leadership
Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(()
-> {
@@ -309,7 +409,7 @@ class FileLockClusterServiceAdvancedFailoverTest extends
FileLockClusterServiceT
FileLockClusterLeaderInfo updatedInfo
= new FileLockClusterLeaderInfo(
- leaderInfo.get().getId(),
TimeUnit.NANOSECONDS.toNanos(2), staleHeartbeatTimestamp);
+ leaderInfo.get().getId(),
TimeUnit.MILLISECONDS.toMillis(2), staleHeartbeatTimestamp);
Path data = clusterMovedLocation.resolve(NAMESPACE + ".data");
try (RandomAccessFile file = new RandomAccessFile(data.toFile(),
"rw")) {
FileLockClusterUtils.writeClusterLeaderInfo(data,
file.getChannel(), updatedInfo,
@@ -317,7 +417,20 @@ class FileLockClusterServiceAdvancedFailoverTest extends
FileLockClusterServiceT
}
// Simulate reattaching the file system by moving the cluster
directory back to the original location
- Files.move(clusterMovedLocation, clusterDir,
StandardCopyOption.REPLACE_EXISTING);
+ try (Stream<Path> stream = Files.walk(clusterMovedLocation)) {
+ stream.forEach(path -> {
+ try {
+ Path destination =
clusterDir.resolve(clusterMovedLocation.relativize(path));
+ if (Files.isDirectory(path)) {
+ Files.createDirectories(destination);
+ } else {
+ Files.copy(path, destination,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
mockEndpointFollower.expectedMinimumMessageCount(1);