This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch 
CAMEL-20199/remove-synchronized-blocks-from-d2f-components
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9bebcceff4f3a4311b526c728b825ab1fabaa1ca
Author: Nicolas Filotto <[email protected]>
AuthorDate: Wed Sep 4 08:32:27 2024 +0200

    CAMEL-20199: Remove synchronized block from components D to F
---
 .../camel/component/dhis2/Dhis2Component.java      |   5 +-
 .../camel/component/direct/DirectComponent.java    |  40 +-
 .../component/disruptor/DisruptorComponent.java    |  10 +-
 .../component/disruptor/DisruptorEndpoint.java     |  10 +-
 .../component/disruptor/DisruptorReference.java    | 151 +++---
 .../camel/component/es/ElasticsearchProducer.java  |   8 +-
 .../etcd3/cloud/Etcd3WatchServiceDiscovery.java    |   9 +-
 .../component/etcd3/policy/Etcd3RoutePolicy.java   |  56 +--
 .../camel/component/file/FileOperations.java       |   8 +-
 .../camel/component/file/GenericFileProducer.java  |   2 +-
 .../file/cluster/FileLockClusterService.java       |  24 +-
 .../file/cluster/FileLockClusterView.java          |  13 +-
 .../component/freemarker/FreemarkerComponent.java  |  57 ++-
 .../component/file/remote/SftpOperations.java      | 504 ++++++++++++---------
 .../apache/camel/support/service/BaseService.java  |   3 +
 15 files changed, 541 insertions(+), 359 deletions(-)

diff --git 
a/components/camel-dhis2/camel-dhis2-component/src/main/java/org/apache/camel/component/dhis2/Dhis2Component.java
 
b/components/camel-dhis2/camel-dhis2-component/src/main/java/org/apache/camel/component/dhis2/Dhis2Component.java
index 345e125fbb6..6e8e026206e 100644
--- 
a/components/camel-dhis2/camel-dhis2-component/src/main/java/org/apache/camel/component/dhis2/Dhis2Component.java
+++ 
b/components/camel-dhis2/camel-dhis2-component/src/main/java/org/apache/camel/component/dhis2/Dhis2Component.java
@@ -66,11 +66,14 @@ public class Dhis2Component extends 
AbstractApiComponent<Dhis2ApiName, Dhis2Conf
 
     public Dhis2Client getClient(Dhis2Configuration endpointConfiguration) {
         if (endpointConfiguration.equals(this.configuration)) {
-            synchronized (this) {
+            lock.lock();
+            try {
                 if (this.dhis2Client == null) {
                     this.dhis2Client = 
Dhis2ClientBuilder.newClient(endpointConfiguration.getBaseApiUrl(),
                             endpointConfiguration.getUsername(), 
endpointConfiguration.getPassword()).build();
                 }
+            } finally {
+                lock.unlock();
             }
 
             return this.dhis2Client;
diff --git 
a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
 
b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
index 403e3f65242..1ab1599c61a 100644
--- 
a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
+++ 
b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
@@ -16,8 +16,13 @@
  */
 package org.apache.camel.component.direct;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.spi.Metadata;
@@ -34,12 +39,14 @@ import org.apache.camel.util.StopWatch;
 public class DirectComponent extends DefaultComponent {
 
     // active consumers
-    private final Map<String, DirectConsumer> consumers = new HashMap<>();
+    private final Lock consumersLock = new ReentrantLock();
+    private final Condition consumersCondition = consumersLock.newCondition();
+    private final Map<String, DirectConsumer> consumers = new 
ConcurrentHashMap<>();
     // counter that is used for producers to keep track if any consumer was 
added/removed since they last checked
     // this is used for optimization to avoid each producer to get consumer 
for each message processed
     // (locking via synchronized, and then lookup in the map as the cost)
     // consumers and producers are only added/removed during startup/shutdown 
or if routes is manually controlled
-    private volatile int stateCounter;
+    private final AtomicInteger stateCounter = new AtomicInteger();
 
     @Metadata(label = "producer", defaultValue = "true")
     private boolean block = true;
@@ -89,33 +96,40 @@ public class DirectComponent extends DefaultComponent {
     }
 
     int getStateCounter() {
-        return stateCounter;
+        return stateCounter.get();
     }
 
     public void addConsumer(String key, DirectConsumer consumer) {
-        synchronized (consumers) {
+        consumersLock.lock();
+        try {
             if (consumers.putIfAbsent(key, consumer) != null) {
                 throw new IllegalArgumentException(
                         "Cannot add a 2nd consumer to the same endpoint: " + 
key
                                                    + ". DirectEndpoint only 
allows one consumer.");
             }
             // state changed so inc counter
-            stateCounter++;
-            consumers.notifyAll();
+            stateCounter.incrementAndGet();
+            consumersCondition.signalAll();
+        } finally {
+            consumersLock.unlock();
         }
     }
 
     public void removeConsumer(String key, DirectConsumer consumer) {
-        synchronized (consumers) {
+        consumersLock.lock();
+        try {
             consumers.remove(key, consumer);
             // state changed so inc counter
-            stateCounter++;
-            consumers.notifyAll();
+            stateCounter.incrementAndGet();
+            consumersCondition.signalAll();
+        } finally {
+            consumersLock.unlock();
         }
     }
 
     protected DirectConsumer getConsumer(String key, boolean block, long 
timeout) throws InterruptedException {
-        synchronized (consumers) {
+        consumersLock.lock();
+        try {
             DirectConsumer answer = consumers.get(key);
             if (answer == null && block) {
                 StopWatch watch = new StopWatch();
@@ -128,10 +142,12 @@ public class DirectComponent extends DefaultComponent {
                     if (rem <= 0) {
                         break;
                     }
-                    consumers.wait(rem);
+                    consumersCondition.await(rem, TimeUnit.MILLISECONDS);
                 }
             }
             return answer;
+        } finally {
+            consumersLock.unlock();
         }
     }
 
diff --git 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java
 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java
index ae365320e85..1d8103297ed 100644
--- 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java
+++ 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java
@@ -133,7 +133,8 @@ public class DisruptorComponent extends DefaultComponent {
         }
         sizeToUse = powerOfTwo(sizeToUse);
 
-        synchronized (this) {
+        lock.lock();
+        try {
             DisruptorReference ref = getDisruptors().get(key);
             if (ref == null) {
                 LOGGER.debug("Creating new disruptor for key {}", key);
@@ -151,6 +152,8 @@ public class DisruptorComponent extends DefaultComponent {
             }
 
             return ref;
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -171,8 +174,11 @@ public class DisruptorComponent extends DefaultComponent {
 
     @Override
     protected void doStop() throws Exception {
-        synchronized (this) {
+        lock.lock();
+        try {
             getDisruptors().clear();
+        } finally {
+            lock.unlock();
         }
         super.doStop();
     }
diff --git 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java
 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java
index 1d1328da242..fdc769ec82b 100644
--- 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java
+++ 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java
@@ -284,7 +284,8 @@ public class DisruptorEndpoint extends DefaultEndpoint 
implements AsyncEndpoint,
     }
 
     void onStarted(final DisruptorConsumer consumer) throws Exception {
-        synchronized (this) {
+        lock.lock();
+        try {
             // validate multiple consumers have been enabled is necessary
             if (!consumers.isEmpty() && !isMultipleConsumersSupported()) {
                 throw new IllegalStateException(
@@ -297,17 +298,22 @@ public class DisruptorEndpoint extends DefaultEndpoint 
implements AsyncEndpoint,
                 LOGGER.debug("Tried to start Consumer {} on endpoint {} but it 
was already started", consumer,
                         getEndpointUri());
             }
+        } finally {
+            lock.unlock();
         }
     }
 
     void onStopped(final DisruptorConsumer consumer) throws Exception {
-        synchronized (this) {
+        lock.lock();
+        try {
             if (consumers.remove(consumer)) {
                 LOGGER.debug("Stopping consumer {} on endpoint {}", consumer, 
getEndpointUri());
                 getDisruptor().reconfigure();
             } else {
                 LOGGER.debug("Tried to stop Consumer {} on endpoint {} but it 
was already stopped", consumer, getEndpointUri());
             }
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorReference.java
 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorReference.java
index ce09adfe3b5..ae466834554 100644
--- 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorReference.java
+++ 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorReference.java
@@ -32,7 +32,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicMarkableReference;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.ReentrantLock;
 
 import com.lmax.disruptor.InsufficientCapacityException;
 import com.lmax.disruptor.RingBuffer;
@@ -73,6 +75,7 @@ public class DisruptorReference {
     private final DisruptorWaitStrategy waitStrategy;
 
     private final Queue<Exchange> temporaryExchangeBuffer;
+    private final Lock lock = new ReentrantLock();
 
     //access guarded by this
     private ExecutorService executor;
@@ -144,11 +147,16 @@ public class DisruptorReference {
         ringBuffer.publish(sequence);
     }
 
-    public synchronized void reconfigure() throws Exception {
-        LOGGER.debug("Reconfiguring disruptor {}", this);
-        shutdownDisruptor(true);
+    public void reconfigure() throws Exception {
+        lock.lock();
+        try {
+            LOGGER.debug("Reconfiguring disruptor {}", this);
+            shutdownDisruptor(true);
 
-        start();
+            start();
+        } finally {
+            lock.unlock();
+        }
     }
 
     private void start() throws Exception {
@@ -282,52 +290,62 @@ public class DisruptorReference {
         }
     }
 
-    private synchronized void shutdownDisruptor(boolean isReconfiguring) {
-        LOGGER.debug("Shutting down disruptor {}, reconfiguring: {}", this, 
isReconfiguring);
-        Disruptor<ExchangeEvent> currentDisruptor = disruptor.getReference();
-        disruptor.set(null, isReconfiguring);
-
-        if (currentDisruptor != null) {
-            //check if we had a blocking event handler to keep an empty 
disruptor 'busy'
-            if (handlers != null && handlers.length == 1
-                    && handlers[0] instanceof BlockingExchangeEventHandler 
blockingExchangeEventHandler) {
-                // yes we did, unblock it so we can get rid of our backlog,
-                // The eventhandler will empty its pending exchanges in our 
temporary buffer
-                blockingExchangeEventHandler.unblock();
-            }
+    private void shutdownDisruptor(boolean isReconfiguring) {
+        lock.lock();
+        try {
+            LOGGER.debug("Shutting down disruptor {}, reconfiguring: {}", 
this, isReconfiguring);
+            Disruptor<ExchangeEvent> currentDisruptor = 
disruptor.getReference();
+            disruptor.set(null, isReconfiguring);
+
+            if (currentDisruptor != null) {
+                //check if we had a blocking event handler to keep an empty 
disruptor 'busy'
+                if (handlers != null && handlers.length == 1
+                        && handlers[0] instanceof BlockingExchangeEventHandler 
blockingExchangeEventHandler) {
+                    // yes we did, unblock it so we can get rid of our backlog,
+                    // The eventhandler will empty its pending exchanges in 
our temporary buffer
+                    blockingExchangeEventHandler.unblock();
+                }
 
-            currentDisruptor.shutdown();
-
-            //they have already been given a trigger to halt when they are 
done by shutting down the disruptor
-            //we do however want to await their completion before they are 
scheduled to process events from the new
-            for (final LifecycleAwareExchangeEventHandler eventHandler : 
handlers) {
-                boolean eventHandlerFinished = false;
-                //the disruptor is now empty and all consumers are either done 
or busy processing their last exchange
-                while (!eventHandlerFinished) {
-                    try {
-                        //The disruptor shutdown command executed above should 
have triggered a halt signal to all
-                        //event processors which, in their death, should 
notify our event handlers. They respond by
-                        //switching a latch and we want to await that latch 
here to make sure they are done.
-                        if (!eventHandler.awaitStopped(10, TimeUnit.SECONDS)) {
-                            //we wait for a relatively long, but limited 
amount of time to prevent an application using
-                            //this component from hanging indefinitely
-                            //Please report a bug if you can repruduce this
-                            LOGGER.error("Disruptor/event handler failed to 
shut down properly, PLEASE REPORT");
+                currentDisruptor.shutdown();
+
+                //they have already been given a trigger to halt when they are 
done by shutting down the disruptor
+                //we do however want to await their completion before they are 
scheduled to process events from the new
+                for (final LifecycleAwareExchangeEventHandler eventHandler : 
handlers) {
+                    boolean eventHandlerFinished = false;
+                    //the disruptor is now empty and all consumers are either 
done or busy processing their last exchange
+                    while (!eventHandlerFinished) {
+                        try {
+                            //The disruptor shutdown command executed above 
should have triggered a halt signal to all
+                            //event processors which, in their death, should 
notify our event handlers. They respond by
+                            //switching a latch and we want to await that 
latch here to make sure they are done.
+                            if (!eventHandler.awaitStopped(10, 
TimeUnit.SECONDS)) {
+                                //we wait for a relatively long, but limited 
amount of time to prevent an application using
+                                //this component from hanging indefinitely
+                                //Please report a bug if you can repruduce this
+                                LOGGER.error("Disruptor/event handler failed 
to shut down properly, PLEASE REPORT");
+                            }
+                            eventHandlerFinished = true;
+                        } catch (InterruptedException e) {
+                            LOGGER.info("Interrupted while waiting for the 
shutdown to complete");
+                            Thread.currentThread().interrupt();
                         }
-                        eventHandlerFinished = true;
-                    } catch (InterruptedException e) {
-                        LOGGER.info("Interrupted while waiting for the 
shutdown to complete");
-                        Thread.currentThread().interrupt();
                     }
                 }
-            }
 
-            handlers = new LifecycleAwareExchangeEventHandler[0];
+                handlers = new LifecycleAwareExchangeEventHandler[0];
+            }
+        } finally {
+            lock.unlock();
         }
     }
 
-    private synchronized void shutdownExecutor() {
-        resizeThreadPoolExecutor(0);
+    private void shutdownExecutor() {
+        lock.lock();
+        try {
+            resizeThreadPoolExecutor(0);
+        } finally {
+            lock.unlock();
+        }
     }
 
     public String getName() {
@@ -361,28 +379,43 @@ public class DisruptorReference {
         return temporaryExchangeBuffer.size();
     }
 
-    public synchronized void addEndpoint(final DisruptorEndpoint 
disruptorEndpoint) {
-        LOGGER.debug("Adding Endpoint: {}", disruptorEndpoint);
-        endpoints.add(disruptorEndpoint);
-        LOGGER.debug("Endpoint added: {}, new total endpoints {}", 
disruptorEndpoint, endpoints.size());
+    public void addEndpoint(final DisruptorEndpoint disruptorEndpoint) {
+        lock.lock();
+        try {
+            LOGGER.debug("Adding Endpoint: {}", disruptorEndpoint);
+            endpoints.add(disruptorEndpoint);
+            LOGGER.debug("Endpoint added: {}, new total endpoints {}", 
disruptorEndpoint, endpoints.size());
+        } finally {
+            lock.unlock();
+        }
     }
 
-    public synchronized void removeEndpoint(final DisruptorEndpoint 
disruptorEndpoint) {
-        LOGGER.debug("Removing Endpoint: {}", disruptorEndpoint);
-        if (getEndpointCount() == 1) {
-            LOGGER.debug("Last Endpoint removed, shutdown disruptor");
-            //Shutdown our disruptor
-            shutdownDisruptor(false);
-
-            //As there are no endpoints dependent on this Disruptor, we may 
also shutdown our executor
-            shutdownExecutor();
+    public void removeEndpoint(final DisruptorEndpoint disruptorEndpoint) {
+        lock.lock();
+        try {
+            LOGGER.debug("Removing Endpoint: {}", disruptorEndpoint);
+            if (getEndpointCount() == 1) {
+                LOGGER.debug("Last Endpoint removed, shutdown disruptor");
+                //Shutdown our disruptor
+                shutdownDisruptor(false);
+
+                //As there are no endpoints dependent on this Disruptor, we 
may also shutdown our executor
+                shutdownExecutor();
+            }
+            endpoints.remove(disruptorEndpoint);
+            LOGGER.debug("Endpoint removed: {}, new total endpoints {}", 
disruptorEndpoint, getEndpointCount());
+        } finally {
+            lock.unlock();
         }
-        endpoints.remove(disruptorEndpoint);
-        LOGGER.debug("Endpoint removed: {}, new total endpoints {}", 
disruptorEndpoint, getEndpointCount());
     }
 
-    public synchronized int getEndpointCount() {
-        return endpoints.size();
+    public int getEndpointCount() {
+        lock.lock();
+        try {
+            return endpoints.size();
+        } finally {
+            lock.unlock();
+        }
     }
 
     @Override
diff --git 
a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java
 
b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java
index 64ee8bb3157..59f14813128 100644
--- 
a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java
+++ 
b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java
@@ -21,6 +21,8 @@ import java.security.KeyStore;
 import java.security.cert.Certificate;
 import java.security.cert.CertificateFactory;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManagerFactory;
@@ -80,7 +82,6 @@ class ElasticsearchProducer extends DefaultAsyncProducer {
     private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchProducer.class);
 
     protected final ElasticsearchConfiguration configuration;
-    private final Object mutex = new Object();
     private volatile RestClient client;
     private Sniffer sniffer;
 
@@ -477,7 +478,8 @@ class ElasticsearchProducer extends DefaultAsyncProducer {
 
     private void startClient() {
         if (client == null) {
-            synchronized (mutex) {
+            lock.lock();
+            try {
                 if (client == null) {
                     LOG.info("Connecting to the ElasticSearch cluster: {}", 
configuration.getClusterName());
                     if (configuration.getHostAddressesList() != null
@@ -487,6 +489,8 @@ class ElasticsearchProducer extends DefaultAsyncProducer {
                         LOG.warn("Incorrect ip address and port parameters 
settings for ElasticSearch cluster");
                     }
                 }
+            } finally {
+                lock.unlock();
             }
         }
     }
diff --git 
a/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/cloud/Etcd3WatchServiceDiscovery.java
 
b/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/cloud/Etcd3WatchServiceDiscovery.java
index 08947a3c5ba..ee1bf2e89a6 100644
--- 
a/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/cloud/Etcd3WatchServiceDiscovery.java
+++ 
b/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/cloud/Etcd3WatchServiceDiscovery.java
@@ -75,10 +75,6 @@ public class Etcd3WatchServiceDiscovery extends 
Etcd3ServiceDiscovery
      * The current watcher used to watch the changes of the service 
definitions.
      */
     private final AtomicReference<Watch.Watcher> watcher = new 
AtomicReference<>();
-    /**
-     * The mutex used to prevent concurrent load of the list of service 
definitions.
-     */
-    private final Object mutex = new Object();
 
     /**
      * Construct a {@code Etcd3WatchServiceDiscovery} with the given 
configuration.
@@ -109,12 +105,15 @@ public class Etcd3WatchServiceDiscovery extends 
Etcd3ServiceDiscovery
     public List<ServiceDefinition> getServices(String name) {
         List<ServiceDefinition> servers = allServices;
         if (servers == null) {
-            synchronized (mutex) {
+            lock.lock();
+            try {
                 servers = allServices;
                 if (servers == null) {
                     servers = reloadServices();
                     doWatch();
                 }
+            } finally {
+                lock.unlock();
             }
         }
 
diff --git 
a/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/policy/Etcd3RoutePolicy.java
 
b/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/policy/Etcd3RoutePolicy.java
index 76cd607dee1..8f38719e71a 100644
--- 
a/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/policy/Etcd3RoutePolicy.java
+++ 
b/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/policy/Etcd3RoutePolicy.java
@@ -62,10 +62,6 @@ public class Etcd3RoutePolicy extends RoutePolicySupport 
implements CamelContext
      * The logger
      */
     private static final Logger LOGGER = 
LoggerFactory.getLogger(Etcd3RoutePolicy.class);
-    /**
-     * The mutex used to prevent concurrent access to {@code suspendedRoutes}.
-     */
-    private final Object mutex = new Object();
     /**
      * The flag indicating whether the current node is a leader.
      */
@@ -173,15 +169,21 @@ public class Etcd3RoutePolicy extends RoutePolicySupport 
implements CamelContext
 
     @Override
     public void onStop(Route route) {
-        synchronized (mutex) {
+        lock.lock();
+        try {
             suspendedRoutes.remove(route);
+        } finally {
+            lock.unlock();
         }
     }
 
     @Override
-    public synchronized void onSuspend(Route route) {
-        synchronized (mutex) {
+    public void onSuspend(Route route) {
+        lock.lock();
+        try {
             suspendedRoutes.remove(route);
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -249,16 +251,17 @@ public class Etcd3RoutePolicy extends RoutePolicySupport 
implements CamelContext
      * @param route the route for which the consumer should be stopped.
      */
     private void stopConsumer(Route route) {
-        synchronized (mutex) {
-            try {
-                if (!suspendedRoutes.contains(route)) {
-                    LOGGER.debug("Stopping consumer for {} ({})", 
route.getId(), route.getConsumer());
-                    stopConsumer(route.getConsumer());
-                    suspendedRoutes.add(route);
-                }
-            } catch (Exception e) {
-                handleException(e);
+        lock.lock();
+        try {
+            if (!suspendedRoutes.contains(route)) {
+                LOGGER.debug("Stopping consumer for {} ({})", route.getId(), 
route.getConsumer());
+                stopConsumer(route.getConsumer());
+                suspendedRoutes.add(route);
             }
+        } catch (Exception e) {
+            handleException(e);
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -266,17 +269,18 @@ public class Etcd3RoutePolicy extends RoutePolicySupport 
implements CamelContext
      * Start all the consumers that have been stopped.
      */
     private void startAllStoppedConsumers() {
-        synchronized (mutex) {
-            try {
-                for (Route suspendedRoute : suspendedRoutes) {
-                    LOGGER.debug("Starting consumer for {} ({})", 
suspendedRoute.getId(), suspendedRoute.getConsumer());
-                    startConsumer(suspendedRoute.getConsumer());
-                }
-
-                suspendedRoutes.clear();
-            } catch (Exception e) {
-                handleException(e);
+        lock.lock();
+        try {
+            for (Route suspendedRoute : suspendedRoutes) {
+                LOGGER.debug("Starting consumer for {} ({})", 
suspendedRoute.getId(), suspendedRoute.getConsumer());
+                startConsumer(suspendedRoute.getConsumer());
             }
+
+            suspendedRoutes.clear();
+        } catch (Exception e) {
+            handleException(e);
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/FileOperations.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/FileOperations.java
index f08c96485ef..74bac17ccbf 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/FileOperations.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/FileOperations.java
@@ -35,6 +35,8 @@ import java.nio.file.attribute.PosixFilePermission;
 import java.nio.file.attribute.PosixFilePermissions;
 import java.util.Date;
 import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.InvalidPayloadException;
@@ -53,6 +55,7 @@ import static 
org.apache.camel.component.file.GenericFileHelper.asExclusiveReadL
  */
 public class FileOperations implements GenericFileOperations<File> {
     private static final Logger LOG = 
LoggerFactory.getLogger(FileOperations.class);
+    private final Lock lock = new ReentrantLock();
     private FileEndpoint endpoint;
 
     public FileOperations() {
@@ -189,7 +192,8 @@ public class FileOperations implements 
GenericFileOperations<File> {
 
         // We need to make sure that this is thread-safe and only one thread
         // tries to create the path directory at the same time.
-        synchronized (this) {
+        lock.lock();
+        try {
             if (path.isDirectory() && path.exists()) {
                 // the directory already exists
                 return true;
@@ -197,6 +201,8 @@ public class FileOperations implements 
GenericFileOperations<File> {
                 LOG.trace("Building directory: {}", path);
                 return buildDirectory(path, 
endpoint.getDirectoryPermissions(), absolute);
             }
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java
index 6d1bed9e1bf..da7f8101fd3 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java
@@ -47,7 +47,7 @@ public class GenericFileProducer<T> extends DefaultProducer {
     protected GenericFileOperations<T> operations;
     // assume writing to 100 different files concurrently at most for the same
     // file producer
-    private final Map<String, Lock> locks = 
Collections.synchronizedMap(LRUCacheFactory.newLRUCache(100));
+    private final Map<String, Lock> locks = LRUCacheFactory.newLRUCache(100);
 
     protected GenericFileProducer(GenericFileEndpoint<T> endpoint, 
GenericFileOperations<T> operations) {
         super(endpoint);
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
index fc5e949cf87..591ad0690f5 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java
@@ -18,9 +18,11 @@ package org.apache.camel.component.file.cluster;
 
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.support.cluster.AbstractCamelClusterService;
+import org.apache.camel.support.service.BaseService;
 import org.apache.camel.util.ObjectHelper;
 
 public class FileLockClusterService extends 
AbstractCamelClusterService<FileLockClusterView> {
@@ -125,15 +127,21 @@ public class FileLockClusterService extends 
AbstractCamelClusterService<FileLock
         }
     }
 
-    synchronized ScheduledExecutorService getExecutor() {
-        if (executor == null) {
-            // Camel context should be set at this stage.
-            final CamelContext context = 
ObjectHelper.notNull(getCamelContext(), "CamelContext");
+    ScheduledExecutorService getExecutor() {
+        Lock internalLock = getInternalLock();
+        internalLock.lock();
+        try {
+            if (executor == null) {
+                // Camel context should be set at this stage.
+                final CamelContext context = 
ObjectHelper.notNull(getCamelContext(), "CamelContext");
 
-            executor = 
context.getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
-                    "FileLockClusterService-" + getId());
-        }
+                executor = context.getExecutorServiceManager()
+                        .newSingleThreadScheduledExecutor(this, 
"FileLockClusterService-" + getId());
+            }
 
-        return executor;
+            return executor;
+        } finally {
+            internalLock.unlock();
+        }
     }
 }
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
index 6c679a657e7..a2e9ad9dfd7 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java
@@ -29,6 +29,8 @@ import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.cluster.CamelClusterMember;
 import org.apache.camel.support.cluster.AbstractCamelClusterView;
@@ -38,6 +40,7 @@ import org.slf4j.LoggerFactory;
 public class FileLockClusterView extends AbstractCamelClusterView {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(FileLockClusterView.class);
 
+    private static final Lock LOCK = new ReentrantLock();
     private final ClusterMember localMember;
     private final Path path;
     private RandomAccessFile lockFile;
@@ -134,7 +137,8 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
                     return;
                 }
 
-                synchronized (FileLockClusterView.this) {
+                LOCK.lock();
+                try {
                     if (lock != null) {
                         LOGGER.info("Lock on file {} lost (lock={})", path, 
lock);
                         fireLeadershipChangedEvent((CamelClusterMember) null);
@@ -152,6 +156,8 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
                     } else {
                         LOGGER.debug("Lock on file {} not acquired ", path);
                     }
+                } finally {
+                    LOCK.unlock();
                 }
             } catch (OverlappingFileLockException e) {
                 reason = new IOException(e);
@@ -169,8 +175,11 @@ public class FileLockClusterView extends 
AbstractCamelClusterView {
     private final class ClusterMember implements CamelClusterMember {
         @Override
         public boolean isLeader() {
-            synchronized (FileLockClusterView.this) {
+            LOCK.lock();
+            try {
                 return lock != null && lock.isValid();
+            } finally {
+                LOCK.unlock();
             }
         }
 
diff --git 
a/components/camel-freemarker/src/main/java/org/apache/camel/component/freemarker/FreemarkerComponent.java
 
b/components/camel-freemarker/src/main/java/org/apache/camel/component/freemarker/FreemarkerComponent.java
index d7ed57bccf8..c3f3a19efd7 100644
--- 
a/components/camel-freemarker/src/main/java/org/apache/camel/component/freemarker/FreemarkerComponent.java
+++ 
b/components/camel-freemarker/src/main/java/org/apache/camel/component/freemarker/FreemarkerComponent.java
@@ -85,24 +85,30 @@ public class FreemarkerComponent extends DefaultComponent {
         return endpoint;
     }
 
-    public synchronized Configuration getConfiguration() {
-        if (configuration == null) {
-            configuration = new Configuration(Configuration.VERSION_2_3_32);
-            configuration.setLocalizedLookup(isLocalizedLookup());
-            configuration.setTemplateLoader(new URLTemplateLoader() {
-                @Override
-                protected URL getURL(String name) {
-                    try {
-                        return 
ResourceHelper.resolveMandatoryResourceAsUrl(getCamelContext(), name);
-                    } catch (Exception e) {
-                        // freemarker prefers to ask for locale first (eg 
xxx_en_GB, xxX_en), and then fallback without locale
-                        // so we should return null to signal the resource 
could not be found
-                        return null;
+    public Configuration getConfiguration() {
+        lock.lock();
+        try {
+            if (configuration == null) {
+                configuration = new 
Configuration(Configuration.VERSION_2_3_32);
+                configuration.setLocalizedLookup(isLocalizedLookup());
+                configuration.setTemplateLoader(new URLTemplateLoader() {
+
+                    @Override
+                    protected URL getURL(String name) {
+                        try {
+                            return 
ResourceHelper.resolveMandatoryResourceAsUrl(getCamelContext(), name);
+                        } catch (Exception e) {
+                            // freemarker prefers to ask for locale first (eg 
xxx_en_GB, xxX_en), and then fallback without locale
+                            // so we should return null to signal the resource 
could not be found
+                            return null;
+                        }
                     }
-                }
-            });
+                });
+            }
+            return (Configuration) configuration.clone();
+        } finally {
+            lock.unlock();
         }
-        return (Configuration) configuration.clone();
     }
 
     /**
@@ -150,14 +156,19 @@ public class FreemarkerComponent extends DefaultComponent 
{
         this.localizedLookup = localizedLookup;
     }
 
-    private synchronized Configuration getNoCacheConfiguration() {
-        if (noCacheConfiguration == null) {
-            // create a clone of the regular configuration
-            noCacheConfiguration = (Configuration) getConfiguration().clone();
-            // set this one to not use cache
-            noCacheConfiguration.setCacheStorage(new NullCacheStorage());
+    private Configuration getNoCacheConfiguration() {
+        lock.lock();
+        try {
+            if (noCacheConfiguration == null) {
+                // create a clone of the regular configuration
+                noCacheConfiguration = (Configuration) 
getConfiguration().clone();
+                // set this one to not use cache
+                noCacheConfiguration.setCacheStorage(new NullCacheStorage());
+            }
+            return noCacheConfiguration;
+        } finally {
+            lock.unlock();
         }
-        return noCacheConfiguration;
     }
 
 }
diff --git 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java
 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java
index 7b4d77700b3..cda1af1413b 100644
--- 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java
+++ 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java
@@ -34,6 +34,8 @@ import java.util.Base64;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Vector;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
 import com.jcraft.jsch.ChannelSftp;
@@ -83,6 +85,7 @@ public class SftpOperations implements 
RemoteFileOperations<SftpRemoteFile> {
     private SftpEndpoint endpoint;
     private ChannelSftp channel;
     private Session session;
+    private final Lock lock = new ReentrantLock();
 
     private static class TaskPayload {
         final RemoteFileConfiguration configuration;
@@ -117,32 +120,35 @@ public class SftpOperations implements 
RemoteFileOperations<SftpRemoteFile> {
     }
 
     @Override
-    public synchronized boolean connect(RemoteFileConfiguration configuration, 
Exchange exchange)
+    public boolean connect(RemoteFileConfiguration configuration, Exchange 
exchange)
             throws GenericFileOperationFailedException {
-        if (isConnected()) {
-            // already connected
-            return true;
-        }
+        lock.lock();
+        try {
+            if (isConnected()) {
+                // already connected
+                return true;
+            }
 
-        BlockingTask task = Tasks
-                .foregroundTask()
-                .withBudget(Budgets.iterationBudget()
-                        
.withMaxIterations(Budgets.atLeastOnce(endpoint.getMaximumReconnectAttempts()))
-                        
.withInterval(Duration.ofMillis(endpoint.getReconnectDelay()))
-                        .build())
-                .build();
+            BlockingTask task = Tasks.foregroundTask()
+                    .withBudget(Budgets.iterationBudget()
+                            
.withMaxIterations(Budgets.atLeastOnce(endpoint.getMaximumReconnectAttempts()))
+                            
.withInterval(Duration.ofMillis(endpoint.getReconnectDelay()))
+                            .build())
+                    .build();
 
-        TaskPayload payload = new TaskPayload(configuration);
+            TaskPayload payload = new TaskPayload(configuration);
 
-        if (!task.run(this::tryConnect, payload)) {
-            throw new GenericFileOperationFailedException(
-                    "Cannot connect to " + 
configuration.remoteServerInformation(),
-                    payload.exception);
-        }
+            if (!task.run(this::tryConnect, payload)) {
+                throw new GenericFileOperationFailedException("Cannot connect 
to " + configuration.remoteServerInformation(),
+                        payload.exception);
+            }
 
-        configureBulkRequests();
+            configureBulkRequests();
 
-        return true;
+            return true;
+        } finally {
+            lock.unlock();
+        }
     }
 
     private boolean tryConnect(TaskPayload payload) {
@@ -489,22 +495,33 @@ public class SftpOperations implements 
RemoteFileOperations<SftpRemoteFile> {
     }
 
     @Override
-    public synchronized boolean isConnected() throws 
GenericFileOperationFailedException {
-        return session != null && session.isConnected() && channel != null && 
channel.isConnected();
+    public boolean isConnected() throws GenericFileOperationFailedException {
+        lock.lock();
+        try {
+            return session != null && session.isConnected() && channel != null 
&& channel.isConnected();
+        } finally {
+            lock.unlock();
+        }
     }
 
     @Override
-    public synchronized void disconnect() throws 
GenericFileOperationFailedException {
-        if (session != null && session.isConnected()) {
-            session.disconnect();
-        }
-        if (channel != null && channel.isConnected()) {
-            channel.disconnect();
+    public void disconnect() throws GenericFileOperationFailedException {
+        lock.lock();
+        try {
+            if (session != null && session.isConnected()) {
+                session.disconnect();
+            }
+            if (channel != null && channel.isConnected()) {
+                channel.disconnect();
+            }
+        } finally {
+            lock.unlock();
         }
     }
 
     @Override
-    public synchronized void forceDisconnect() throws 
GenericFileOperationFailedException {
+    public void forceDisconnect() throws GenericFileOperationFailedException {
+        lock.lock();
         try {
             if (session != null) {
                 session.disconnect();
@@ -516,6 +533,7 @@ public class SftpOperations implements 
RemoteFileOperations<SftpRemoteFile> {
             // ensure these
             session = null;
             channel = null;
+            lock.unlock();
         }
     }
 
@@ -526,8 +544,9 @@ public class SftpOperations implements 
RemoteFileOperations<SftpRemoteFile> {
     }
 
     @Override
-    public synchronized boolean deleteFile(String name) throws 
GenericFileOperationFailedException {
+    public boolean deleteFile(String name) throws 
GenericFileOperationFailedException {
         LOG.debug("Deleting file: {}", name);
+        lock.lock();
         try {
             reconnectIfNecessary(null);
             channel.rm(name);
@@ -535,12 +554,15 @@ public class SftpOperations implements 
RemoteFileOperations<SftpRemoteFile> {
         } catch (SftpException e) {
             LOG.debug("Cannot delete file {}: {}", name, e.getMessage(), e);
             throw new GenericFileOperationFailedException("Cannot delete file: 
" + name, e);
+        } finally {
+            lock.unlock();
         }
     }
 
     @Override
-    public synchronized boolean renameFile(String from, String to) throws 
GenericFileOperationFailedException {
+    public boolean renameFile(String from, String to) throws 
GenericFileOperationFailedException {
         LOG.debug("Renaming file: {} to: {}", from, to);
+        lock.lock();
         try {
             reconnectIfNecessary(null);
             // make use of the '/' separator because JSch expects this
@@ -551,64 +573,71 @@ public class SftpOperations implements 
RemoteFileOperations<SftpRemoteFile> {
         } catch (SftpException e) {
             LOG.debug("Cannot rename file from: {} to: {}", from, to, e);
             throw new GenericFileOperationFailedException("Cannot rename file 
from: " + from + " to: " + to, e);
+        } finally {
+            lock.unlock();
         }
     }
 
     @Override
-    public synchronized boolean buildDirectory(String directory, boolean 
absolute) throws GenericFileOperationFailedException {
-        // must normalize directory first
-        directory = endpoint.getConfiguration().normalizePath(directory);
+    public boolean buildDirectory(String directory, boolean absolute) throws 
GenericFileOperationFailedException {
+        lock.lock();
+        try {
+            // must normalize directory first
+            directory = endpoint.getConfiguration().normalizePath(directory);
 
-        LOG.trace("buildDirectory({},{})", directory, absolute);
-        // ignore absolute as all dirs are relative with FTP
-        boolean success = false;
+            LOG.trace("buildDirectory({},{})", directory, absolute);
+            // ignore absolute as all dirs are relative with FTP
+            boolean success = false;
 
-        // whether to check for existing dir using CD or LS
-        boolean cdCheck = 
!this.endpoint.getConfiguration().isExistDirCheckUsingLs();
-        String originalDirectory = cdCheck ? getCurrentDirectory() : null;
+            // whether to check for existing dir using CD or LS
+            boolean cdCheck = 
!this.endpoint.getConfiguration().isExistDirCheckUsingLs();
+            String originalDirectory = cdCheck ? getCurrentDirectory() : null;
 
-        try {
-            // maybe the full directory already exists
             try {
-                if (cdCheck) {
-                    channel.cd(directory);
-                } else {
-                    // just do a fast listing
-                    channel.ls(directory, entry -> 
ChannelSftp.LsEntrySelector.BREAK);
-                }
-                success = true;
-            } catch (SftpException e) {
-                // ignore, we could not change directory so try to create it
-                // instead
-            }
-
-            if (!success) {
-                LOG.debug("Trying to build remote directory: {}", directory);
+                // maybe the full directory already exists
                 try {
-                    channel.mkdir(directory);
+                    if (cdCheck) {
+                        channel.cd(directory);
+                    } else {
+                        // just do a fast listing
+                        channel.ls(directory, entry -> 
ChannelSftp.LsEntrySelector.BREAK);
+                    }
                     success = true;
                 } catch (SftpException e) {
-                    // we are here if the server side doesn't create
-                    // intermediate folders
-                    // so create the folder one by one
-                    success = buildDirectoryChunks(directory);
+                    // ignore, we could not change directory so try to create 
it
+                    // instead
                 }
 
-                // only after successfully creating directory, we may set 
chmod on the file
-                if (success) {
-                    chmodOfDirectory(directory);
+                if (!success) {
+                    LOG.debug("Trying to build remote directory: {}", 
directory);
+                    try {
+                        channel.mkdir(directory);
+                        success = true;
+                    } catch (SftpException e) {
+                        // we are here if the server side doesn't create
+                        // intermediate folders
+                        // so create the folder one by one
+                        success = buildDirectoryChunks(directory);
+                    }
+
+                    // only after successfully creating directory, we may set 
chmod on the file
+                    if (success) {
+                        chmodOfDirectory(directory);
+                    }
                 }
-            }
 
-            // change back to original directory
-            if (originalDirectory != null) {
-                changeCurrentDirectory(originalDirectory);
+                // change back to original directory
+                if (originalDirectory != null) {
+                    changeCurrentDirectory(originalDirectory);
+                }
+            } catch (SftpException e) {
+                throw new GenericFileOperationFailedException("Cannot build 
directory: " + directory, e);
             }
-        } catch (SftpException e) {
-            throw new GenericFileOperationFailedException("Cannot build 
directory: " + directory, e);
-        }
 
-        return success;
+            return success;
+        } finally {
+            lock.unlock();
+        }
     }
 
     private boolean buildDirectoryChunks(String dirName) throws SftpException {
@@ -650,81 +679,89 @@ public class SftpOperations implements 
RemoteFileOperations<SftpRemoteFile> {
     }
 
     @Override
-    public synchronized String getCurrentDirectory() throws 
GenericFileOperationFailedException {
+    public String getCurrentDirectory() throws 
GenericFileOperationFailedException {
         LOG.trace("getCurrentDirectory()");
+        lock.lock();
         try {
             String answer = channel.pwd();
             LOG.trace("Current dir: {}", answer);
             return answer;
         } catch (SftpException e) {
             throw new GenericFileOperationFailedException("Cannot get current 
directory", e);
+        } finally {
+            lock.unlock();
         }
     }
 
     @Override
-    public synchronized void changeCurrentDirectory(String path) throws 
GenericFileOperationFailedException {
+    public void changeCurrentDirectory(String path) throws 
GenericFileOperationFailedException {
         LOG.trace("changeCurrentDirectory({})", path);
         if (ObjectHelper.isEmpty(path)) {
             return;
         }
+        lock.lock();
+        try {
 
-        // must compact path so SFTP server can traverse correctly, make use of
-        // the '/'
-        // separator because JSch expects this as the file separator even on
-        // Windows
-        String before = path;
-        char separatorChar = '/';
-        path = FileUtil.compactPath(path, separatorChar);
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Compacted path: {} -> {} using separator: {}", before, 
path, separatorChar);
-        }
+            // must compact path so SFTP server can traverse correctly, make 
use of
+            // the '/'
+            // separator because JSch expects this as the file separator even 
on
+            // Windows
+            String before = path;
+            char separatorChar = '/';
+            path = FileUtil.compactPath(path, separatorChar);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Compacted path: {} -> {} using separator: {}", 
before, path, separatorChar);
+            }
 
-        // not stepwise should change directory in one operation
-        if (!endpoint.getConfiguration().isStepwise()) {
-            doChangeDirectory(path);
-            return;
-        }
-        if (getCurrentDirectory().startsWith(path)) {
-            // extract the path segment relative to the target path and make
-            // sure it keeps the preceding '/' for the regex op
-            String p = getCurrentDirectory().substring(path.length() - 
(path.endsWith("/") ? 1 : 0));
-            if (p.isEmpty()) {
+            // not stepwise should change directory in one operation
+            if (!endpoint.getConfiguration().isStepwise()) {
+                doChangeDirectory(path);
                 return;
             }
-            // the first character must be '/' and hence removed
-            path = UP_DIR_PATTERN.matcher(p).replaceAll("/..").substring(1);
-        }
+            if (getCurrentDirectory().startsWith(path)) {
+                // extract the path segment relative to the target path and 
make
+                // sure it keeps the preceding '/' for the regex op
+                String p = getCurrentDirectory().substring(path.length() - 
(path.endsWith("/") ? 1 : 0));
+                if (p.isEmpty()) {
+                    return;
+                }
+                // the first character must be '/' and hence removed
+                path = 
UP_DIR_PATTERN.matcher(p).replaceAll("/..").substring(1);
+            }
 
-        // if it starts with the root path then a little special handling for
-        // that
-        if (FileUtil.hasLeadingSeparator(path)) {
-            // change to root path
-            if (!path.matches("^[a-zA-Z]:(//|\\\\).*$")) {
-                doChangeDirectory(path.substring(0, 1));
-                path = path.substring(1);
-            } else {
-                if (path.matches("^[a-zA-Z]:(//).*$")) {
-                    doChangeDirectory(path.substring(0, 3));
-                    path = path.substring(3);
-                } else if (path.matches("^[a-zA-Z]:(\\\\).*$")) {
-                    doChangeDirectory(path.substring(0, 4));
-                    path = path.substring(4);
+            // if it starts with the root path then a little special handling 
for
+            // that
+            if (FileUtil.hasLeadingSeparator(path)) {
+                // change to root path
+                if (!path.matches("^[a-zA-Z]:(//|\\\\).*$")) {
+                    doChangeDirectory(path.substring(0, 1));
+                    path = path.substring(1);
+                } else {
+                    if (path.matches("^[a-zA-Z]:(//).*$")) {
+                        doChangeDirectory(path.substring(0, 3));
+                        path = path.substring(3);
+                    } else if (path.matches("^[a-zA-Z]:(\\\\).*$")) {
+                        doChangeDirectory(path.substring(0, 4));
+                        path = path.substring(4);
+                    }
                 }
             }
-        }
 
-        // split into multiple dirs
-        final String[] dirs = path.split("/|\\\\");
+            // split into multiple dirs
+            final String[] dirs = path.split("/|\\\\");
 
-        if (dirs == null || dirs.length == 0) {
-            // path was just a relative single path
-            doChangeDirectory(path);
-            return;
-        }
+            if (dirs == null || dirs.length == 0) {
+                // path was just a relative single path
+                doChangeDirectory(path);
+                return;
+            }
 
-        // there are multiple dirs so do this in chunks
-        for (String dir : dirs) {
-            doChangeDirectory(dir);
+            // there are multiple dirs so do this in chunks
+            for (String dir : dirs) {
+                doChangeDirectory(dir);
+            }
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -741,32 +778,43 @@ public class SftpOperations implements 
RemoteFileOperations<SftpRemoteFile> {
     }
 
     @Override
-    public synchronized void changeToParentDirectory() throws 
GenericFileOperationFailedException {
+    public void changeToParentDirectory() throws 
GenericFileOperationFailedException {
         LOG.trace("changeToParentDirectory()");
-        String current = getCurrentDirectory();
+        lock.lock();
+        try {
+            String current = getCurrentDirectory();
 
-        String parent = FileUtil.compactPath(current + "/..");
-        // must start with absolute
-        if (!parent.startsWith("/")) {
-            parent = "/" + parent;
-        }
+            String parent = FileUtil.compactPath(current + "/..");
+            // must start with absolute
+            if (!parent.startsWith("/")) {
+                parent = "/" + parent;
+            }
 
-        changeCurrentDirectory(parent);
+            changeCurrentDirectory(parent);
+        } finally {
+            lock.unlock();
+        }
     }
 
     @Override
-    public synchronized SftpRemoteFile[] listFiles() throws 
GenericFileOperationFailedException {
-        return listFiles(".");
+    public SftpRemoteFile[] listFiles() throws 
GenericFileOperationFailedException {
+        lock.lock();
+        try {
+            return listFiles(".");
+        } finally {
+            lock.unlock();
+        }
     }
 
     @Override
-    public synchronized SftpRemoteFile[] listFiles(String path) throws 
GenericFileOperationFailedException {
+    public SftpRemoteFile[] listFiles(String path) throws 
GenericFileOperationFailedException {
         LOG.trace("Listing remote files from path {}", path);
         if (ObjectHelper.isEmpty(path)) {
             // list current directory if file path is not given
             path = ".";
         }
 
+        lock.lock();
         try {
             Vector<?> files = channel.ls(path);
 
@@ -775,33 +823,45 @@ public class SftpOperations implements 
RemoteFileOperations<SftpRemoteFile> {
                     .toArray(SftpRemoteFileJCraft[]::new);
         } catch (SftpException e) {
             throw new GenericFileOperationFailedException("Cannot list 
directory: " + path, e);
+        } finally {
+            lock.unlock();
         }
     }
 
     @Override
-    public synchronized boolean retrieveFile(String name, Exchange exchange, 
long size)
+    public boolean retrieveFile(String name, Exchange exchange, long size)
             throws GenericFileOperationFailedException {
         LOG.trace("retrieveFile({})", name);
-        if (ObjectHelper.isNotEmpty(endpoint.getLocalWorkDirectory())) {
-            // local work directory is configured so we should store file
-            // content as files in this local directory
-            return retrieveFileToFileInLocalWorkDirectory(name, exchange);
-        } else {
-            // store file content directory as stream on the body
-            return retrieveFileToStreamInBody(name, exchange);
+        lock.lock();
+        try {
+            if (ObjectHelper.isNotEmpty(endpoint.getLocalWorkDirectory())) {
+                // local work directory is configured so we should store file
+                // content as files in this local directory
+                return retrieveFileToFileInLocalWorkDirectory(name, exchange);
+            } else {
+                // store file content directory as stream on the body
+                return retrieveFileToStreamInBody(name, exchange);
+            }
+        } finally {
+            lock.unlock();
         }
     }
 
     @Override
-    public synchronized void releaseRetrievedFileResources(Exchange exchange) 
throws GenericFileOperationFailedException {
-        InputStream is = 
exchange.getIn().getHeader(FtpConstants.REMOTE_FILE_INPUT_STREAM, 
InputStream.class);
+    public void releaseRetrievedFileResources(Exchange exchange) throws 
GenericFileOperationFailedException {
+        lock.lock();
+        try {
+            InputStream is = 
exchange.getIn().getHeader(FtpConstants.REMOTE_FILE_INPUT_STREAM, 
InputStream.class);
 
-        if (is != null) {
-            try {
-                is.close();
-            } catch (IOException e) {
-                throw new GenericFileOperationFailedException(e.getMessage(), 
e);
+            if (is != null) {
+                try {
+                    is.close();
+                } catch (IOException e) {
+                    throw new 
GenericFileOperationFailedException(e.getMessage(), e);
+                }
             }
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -970,40 +1030,45 @@ public class SftpOperations implements 
RemoteFileOperations<SftpRemoteFile> {
     }
 
     @Override
-    public synchronized boolean storeFile(String name, Exchange exchange, long 
size)
+    public boolean storeFile(String name, Exchange exchange, long size)
             throws GenericFileOperationFailedException {
-        // must normalize name first
-        name = endpoint.getConfiguration().normalizePath(name);
+        lock.lock();
+        try {
+            // must normalize name first
+            name = endpoint.getConfiguration().normalizePath(name);
 
-        LOG.trace("storeFile({})", name);
+            LOG.trace("storeFile({})", name);
 
-        boolean answer;
-        String currentDir = null;
-        String path = FileUtil.onlyPath(name);
-        String targetName = name;
+            boolean answer;
+            String currentDir = null;
+            String path = FileUtil.onlyPath(name);
+            String targetName = name;
 
-        if (path != null && endpoint.getConfiguration().isStepwise()) {
-            // must remember current dir so we stay in that directory after
-            // the write
-            currentDir = getCurrentDirectory();
+            if (path != null && endpoint.getConfiguration().isStepwise()) {
+                // must remember current dir so we stay in that directory after
+                // the write
+                currentDir = getCurrentDirectory();
 
-            // change to path of name
-            changeCurrentDirectory(path);
+                // change to path of name
+                changeCurrentDirectory(path);
 
-            // the target name should be without path, as we have changed
-            // directory
-            targetName = FileUtil.stripPath(name);
-        }
+                // the target name should be without path, as we have changed
+                // directory
+                targetName = FileUtil.stripPath(name);
+            }
 
-        // store the file
-        answer = doStoreFile(name, targetName, exchange);
+            // store the file
+            answer = doStoreFile(name, targetName, exchange);
 
-        // change back to current directory if we changed directory
-        if (currentDir != null) {
-            changeCurrentDirectory(currentDir);
-        }
+            // change back to current directory if we changed directory
+            if (currentDir != null) {
+                changeCurrentDirectory(currentDir);
+            }
 
-        return answer;
+            return answer;
+        } finally {
+            lock.unlock();
+        }
     }
 
     private boolean doStoreFile(String name, String targetName, Exchange 
exchange) throws GenericFileOperationFailedException {
@@ -1088,44 +1153,48 @@ public class SftpOperations implements 
RemoteFileOperations<SftpRemoteFile> {
     }
 
     @Override
-    public synchronized boolean existsFile(String name) throws 
GenericFileOperationFailedException {
+    public boolean existsFile(String name) throws 
GenericFileOperationFailedException {
         LOG.trace("existsFile({})", name);
-        if (endpoint.isFastExistsCheck()) {
-            return fastExistsFile(name);
-        }
-        // check whether a file already exists
-        String directory = FileUtil.onlyPath(name);
-        if (directory == null) {
-            // assume current dir if no path could be extracted
-            directory = ".";
-        }
-        String onlyName = FileUtil.stripPath(name);
-
+        lock.lock();
         try {
-            @SuppressWarnings("rawtypes")
-            List files = channel.ls(directory);
-            // can return either null or an empty list depending on FTP servers
-            if (files == null) {
-                return false;
+            if (endpoint.isFastExistsCheck()) {
+                return fastExistsFile(name);
             }
-            for (Object file : files) {
-                ChannelSftp.LsEntry entry = (ChannelSftp.LsEntry) file;
-                String existing = entry.getFilename();
-                LOG.trace("Existing file: {}, target file: {}", existing, 
name);
-                existing = FileUtil.stripPath(existing);
-                if (existing != null && existing.equals(onlyName)) {
-                    return true;
-                }
+            // check whether a file already exists
+            String directory = FileUtil.onlyPath(name);
+            if (directory == null) {
+                // assume current dir if no path could be extracted
+                directory = ".";
             }
-            return false;
-        } catch (SftpException e) {
-            // or an exception can be thrown with id 2 which means file does 
not
-            // exists
-            if (ChannelSftp.SSH_FX_NO_SUCH_FILE == e.id) {
+            String onlyName = FileUtil.stripPath(name);
+
+            try {
+                @SuppressWarnings("rawtypes") List files = 
channel.ls(directory);
+                // can return either null or an empty list depending on FTP 
servers
+                if (files == null) {
+                    return false;
+                }
+                for (Object file : files) {
+                    ChannelSftp.LsEntry entry = (ChannelSftp.LsEntry) file;
+                    String existing = entry.getFilename();
+                    LOG.trace("Existing file: {}, target file: {}", existing, 
name);
+                    existing = FileUtil.stripPath(existing);
+                    if (existing != null && existing.equals(onlyName)) {
+                        return true;
+                    }
+                }
                 return false;
+            } catch (SftpException e) {
+                // or an exception can be thrown with id 2 which means file 
does not
+                // exists
+                if (ChannelSftp.SSH_FX_NO_SUCH_FILE == e.id) {
+                    return false;
+                }
+                // otherwise its a more serious error so rethrow
+                throw new GenericFileOperationFailedException(e.getMessage(), 
e);
             }
-            // otherwise its a more serious error so rethrow
-            throw new GenericFileOperationFailedException(e.getMessage(), e);
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -1150,21 +1219,26 @@ public class SftpOperations implements 
RemoteFileOperations<SftpRemoteFile> {
     }
 
     @Override
-    public synchronized boolean sendNoop() throws 
GenericFileOperationFailedException {
-        if (isConnected()) {
-            try {
-                session.sendKeepAliveMsg();
-                return true;
-            } catch (Exception e) {
-                LOG.debug("SFTP session was closed. Ignoring this exception.", 
e);
-                return false;
+    public boolean sendNoop() throws GenericFileOperationFailedException {
+        lock.lock();
+        try {
+            if (isConnected()) {
+                try {
+                    session.sendKeepAliveMsg();
+                    return true;
+                } catch (Exception e) {
+                    LOG.debug("SFTP session was closed. Ignoring this 
exception.", e);
+                    return false;
+                }
             }
+            return false;
+        } finally {
+            lock.unlock();
         }
-        return false;
     }
 
     @Override
-    public synchronized boolean sendSiteCommand(String command) throws 
GenericFileOperationFailedException {
+    public boolean sendSiteCommand(String command) throws 
GenericFileOperationFailedException {
         // is not implemented
         return true;
     }
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/support/service/BaseService.java
 
b/core/camel-api/src/main/java/org/apache/camel/support/service/BaseService.java
index 095b92c0657..9d391be7e50 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/support/service/BaseService.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/support/service/BaseService.java
@@ -451,4 +451,7 @@ public abstract class BaseService {
         return Holder.LOG;
     }
 
+    protected Lock getInternalLock() {
+        return lock;
+    }
 }

Reply via email to