This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch CAMEL-20199/remove-synchronized-blocks-from-d2f-components in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9bebcceff4f3a4311b526c728b825ab1fabaa1ca Author: Nicolas Filotto <[email protected]> AuthorDate: Wed Sep 4 08:32:27 2024 +0200 CAMEL-20199: Remove synchronized block from components D to F --- .../camel/component/dhis2/Dhis2Component.java | 5 +- .../camel/component/direct/DirectComponent.java | 40 +- .../component/disruptor/DisruptorComponent.java | 10 +- .../component/disruptor/DisruptorEndpoint.java | 10 +- .../component/disruptor/DisruptorReference.java | 151 +++--- .../camel/component/es/ElasticsearchProducer.java | 8 +- .../etcd3/cloud/Etcd3WatchServiceDiscovery.java | 9 +- .../component/etcd3/policy/Etcd3RoutePolicy.java | 56 +-- .../camel/component/file/FileOperations.java | 8 +- .../camel/component/file/GenericFileProducer.java | 2 +- .../file/cluster/FileLockClusterService.java | 24 +- .../file/cluster/FileLockClusterView.java | 13 +- .../component/freemarker/FreemarkerComponent.java | 57 ++- .../component/file/remote/SftpOperations.java | 504 ++++++++++++--------- .../apache/camel/support/service/BaseService.java | 3 + 15 files changed, 541 insertions(+), 359 deletions(-) diff --git a/components/camel-dhis2/camel-dhis2-component/src/main/java/org/apache/camel/component/dhis2/Dhis2Component.java b/components/camel-dhis2/camel-dhis2-component/src/main/java/org/apache/camel/component/dhis2/Dhis2Component.java index 345e125fbb6..6e8e026206e 100644 --- a/components/camel-dhis2/camel-dhis2-component/src/main/java/org/apache/camel/component/dhis2/Dhis2Component.java +++ b/components/camel-dhis2/camel-dhis2-component/src/main/java/org/apache/camel/component/dhis2/Dhis2Component.java @@ -66,11 +66,14 @@ public class Dhis2Component extends AbstractApiComponent<Dhis2ApiName, Dhis2Conf public Dhis2Client getClient(Dhis2Configuration endpointConfiguration) { if (endpointConfiguration.equals(this.configuration)) { - synchronized (this) { + lock.lock(); + try { if (this.dhis2Client == null) { this.dhis2Client = Dhis2ClientBuilder.newClient(endpointConfiguration.getBaseApiUrl(), endpointConfiguration.getUsername(), endpointConfiguration.getPassword()).build(); } + } finally { + lock.unlock(); } return this.dhis2Client; diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java index 403e3f65242..1ab1599c61a 100644 --- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java +++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java @@ -16,8 +16,13 @@ */ package org.apache.camel.component.direct; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.Endpoint; import org.apache.camel.spi.Metadata; @@ -34,12 +39,14 @@ import org.apache.camel.util.StopWatch; public class DirectComponent extends DefaultComponent { // active consumers - private final Map<String, DirectConsumer> consumers = new HashMap<>(); + private final Lock consumersLock = new ReentrantLock(); + private final Condition consumersCondition = consumersLock.newCondition(); + private final Map<String, DirectConsumer> consumers = new ConcurrentHashMap<>(); // counter that is used for producers to keep track if any consumer was added/removed since they last checked // this is used for optimization to avoid each producer to get consumer for each message processed // (locking via synchronized, and then lookup in the map as the cost) // consumers and producers are only added/removed during startup/shutdown or if routes is manually controlled - private volatile int stateCounter; + private final AtomicInteger stateCounter = new AtomicInteger(); @Metadata(label = "producer", defaultValue = "true") private boolean block = true; @@ -89,33 +96,40 @@ public class DirectComponent extends DefaultComponent { } int getStateCounter() { - return stateCounter; + return stateCounter.get(); } public void addConsumer(String key, DirectConsumer consumer) { - synchronized (consumers) { + consumersLock.lock(); + try { if (consumers.putIfAbsent(key, consumer) != null) { throw new IllegalArgumentException( "Cannot add a 2nd consumer to the same endpoint: " + key + ". DirectEndpoint only allows one consumer."); } // state changed so inc counter - stateCounter++; - consumers.notifyAll(); + stateCounter.incrementAndGet(); + consumersCondition.signalAll(); + } finally { + consumersLock.unlock(); } } public void removeConsumer(String key, DirectConsumer consumer) { - synchronized (consumers) { + consumersLock.lock(); + try { consumers.remove(key, consumer); // state changed so inc counter - stateCounter++; - consumers.notifyAll(); + stateCounter.incrementAndGet(); + consumersCondition.signalAll(); + } finally { + consumersLock.unlock(); } } protected DirectConsumer getConsumer(String key, boolean block, long timeout) throws InterruptedException { - synchronized (consumers) { + consumersLock.lock(); + try { DirectConsumer answer = consumers.get(key); if (answer == null && block) { StopWatch watch = new StopWatch(); @@ -128,10 +142,12 @@ public class DirectComponent extends DefaultComponent { if (rem <= 0) { break; } - consumers.wait(rem); + consumersCondition.await(rem, TimeUnit.MILLISECONDS); } } return answer; + } finally { + consumersLock.unlock(); } } diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java index ae365320e85..1d8103297ed 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorComponent.java @@ -133,7 +133,8 @@ public class DisruptorComponent extends DefaultComponent { } sizeToUse = powerOfTwo(sizeToUse); - synchronized (this) { + lock.lock(); + try { DisruptorReference ref = getDisruptors().get(key); if (ref == null) { LOGGER.debug("Creating new disruptor for key {}", key); @@ -151,6 +152,8 @@ public class DisruptorComponent extends DefaultComponent { } return ref; + } finally { + lock.unlock(); } } @@ -171,8 +174,11 @@ public class DisruptorComponent extends DefaultComponent { @Override protected void doStop() throws Exception { - synchronized (this) { + lock.lock(); + try { getDisruptors().clear(); + } finally { + lock.unlock(); } super.doStop(); } diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java index 1d1328da242..fdc769ec82b 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorEndpoint.java @@ -284,7 +284,8 @@ public class DisruptorEndpoint extends DefaultEndpoint implements AsyncEndpoint, } void onStarted(final DisruptorConsumer consumer) throws Exception { - synchronized (this) { + lock.lock(); + try { // validate multiple consumers have been enabled is necessary if (!consumers.isEmpty() && !isMultipleConsumersSupported()) { throw new IllegalStateException( @@ -297,17 +298,22 @@ public class DisruptorEndpoint extends DefaultEndpoint implements AsyncEndpoint, LOGGER.debug("Tried to start Consumer {} on endpoint {} but it was already started", consumer, getEndpointUri()); } + } finally { + lock.unlock(); } } void onStopped(final DisruptorConsumer consumer) throws Exception { - synchronized (this) { + lock.lock(); + try { if (consumers.remove(consumer)) { LOGGER.debug("Stopping consumer {} on endpoint {}", consumer, getEndpointUri()); getDisruptor().reconfigure(); } else { LOGGER.debug("Tried to stop Consumer {} on endpoint {} but it was already stopped", consumer, getEndpointUri()); } + } finally { + lock.unlock(); } } diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorReference.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorReference.java index ce09adfe3b5..ae466834554 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorReference.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorReference.java @@ -32,7 +32,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicMarkableReference; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.locks.ReentrantLock; import com.lmax.disruptor.InsufficientCapacityException; import com.lmax.disruptor.RingBuffer; @@ -73,6 +75,7 @@ public class DisruptorReference { private final DisruptorWaitStrategy waitStrategy; private final Queue<Exchange> temporaryExchangeBuffer; + private final Lock lock = new ReentrantLock(); //access guarded by this private ExecutorService executor; @@ -144,11 +147,16 @@ public class DisruptorReference { ringBuffer.publish(sequence); } - public synchronized void reconfigure() throws Exception { - LOGGER.debug("Reconfiguring disruptor {}", this); - shutdownDisruptor(true); + public void reconfigure() throws Exception { + lock.lock(); + try { + LOGGER.debug("Reconfiguring disruptor {}", this); + shutdownDisruptor(true); - start(); + start(); + } finally { + lock.unlock(); + } } private void start() throws Exception { @@ -282,52 +290,62 @@ public class DisruptorReference { } } - private synchronized void shutdownDisruptor(boolean isReconfiguring) { - LOGGER.debug("Shutting down disruptor {}, reconfiguring: {}", this, isReconfiguring); - Disruptor<ExchangeEvent> currentDisruptor = disruptor.getReference(); - disruptor.set(null, isReconfiguring); - - if (currentDisruptor != null) { - //check if we had a blocking event handler to keep an empty disruptor 'busy' - if (handlers != null && handlers.length == 1 - && handlers[0] instanceof BlockingExchangeEventHandler blockingExchangeEventHandler) { - // yes we did, unblock it so we can get rid of our backlog, - // The eventhandler will empty its pending exchanges in our temporary buffer - blockingExchangeEventHandler.unblock(); - } + private void shutdownDisruptor(boolean isReconfiguring) { + lock.lock(); + try { + LOGGER.debug("Shutting down disruptor {}, reconfiguring: {}", this, isReconfiguring); + Disruptor<ExchangeEvent> currentDisruptor = disruptor.getReference(); + disruptor.set(null, isReconfiguring); + + if (currentDisruptor != null) { + //check if we had a blocking event handler to keep an empty disruptor 'busy' + if (handlers != null && handlers.length == 1 + && handlers[0] instanceof BlockingExchangeEventHandler blockingExchangeEventHandler) { + // yes we did, unblock it so we can get rid of our backlog, + // The eventhandler will empty its pending exchanges in our temporary buffer + blockingExchangeEventHandler.unblock(); + } - currentDisruptor.shutdown(); - - //they have already been given a trigger to halt when they are done by shutting down the disruptor - //we do however want to await their completion before they are scheduled to process events from the new - for (final LifecycleAwareExchangeEventHandler eventHandler : handlers) { - boolean eventHandlerFinished = false; - //the disruptor is now empty and all consumers are either done or busy processing their last exchange - while (!eventHandlerFinished) { - try { - //The disruptor shutdown command executed above should have triggered a halt signal to all - //event processors which, in their death, should notify our event handlers. They respond by - //switching a latch and we want to await that latch here to make sure they are done. - if (!eventHandler.awaitStopped(10, TimeUnit.SECONDS)) { - //we wait for a relatively long, but limited amount of time to prevent an application using - //this component from hanging indefinitely - //Please report a bug if you can repruduce this - LOGGER.error("Disruptor/event handler failed to shut down properly, PLEASE REPORT"); + currentDisruptor.shutdown(); + + //they have already been given a trigger to halt when they are done by shutting down the disruptor + //we do however want to await their completion before they are scheduled to process events from the new + for (final LifecycleAwareExchangeEventHandler eventHandler : handlers) { + boolean eventHandlerFinished = false; + //the disruptor is now empty and all consumers are either done or busy processing their last exchange + while (!eventHandlerFinished) { + try { + //The disruptor shutdown command executed above should have triggered a halt signal to all + //event processors which, in their death, should notify our event handlers. They respond by + //switching a latch and we want to await that latch here to make sure they are done. + if (!eventHandler.awaitStopped(10, TimeUnit.SECONDS)) { + //we wait for a relatively long, but limited amount of time to prevent an application using + //this component from hanging indefinitely + //Please report a bug if you can repruduce this + LOGGER.error("Disruptor/event handler failed to shut down properly, PLEASE REPORT"); + } + eventHandlerFinished = true; + } catch (InterruptedException e) { + LOGGER.info("Interrupted while waiting for the shutdown to complete"); + Thread.currentThread().interrupt(); } - eventHandlerFinished = true; - } catch (InterruptedException e) { - LOGGER.info("Interrupted while waiting for the shutdown to complete"); - Thread.currentThread().interrupt(); } } - } - handlers = new LifecycleAwareExchangeEventHandler[0]; + handlers = new LifecycleAwareExchangeEventHandler[0]; + } + } finally { + lock.unlock(); } } - private synchronized void shutdownExecutor() { - resizeThreadPoolExecutor(0); + private void shutdownExecutor() { + lock.lock(); + try { + resizeThreadPoolExecutor(0); + } finally { + lock.unlock(); + } } public String getName() { @@ -361,28 +379,43 @@ public class DisruptorReference { return temporaryExchangeBuffer.size(); } - public synchronized void addEndpoint(final DisruptorEndpoint disruptorEndpoint) { - LOGGER.debug("Adding Endpoint: {}", disruptorEndpoint); - endpoints.add(disruptorEndpoint); - LOGGER.debug("Endpoint added: {}, new total endpoints {}", disruptorEndpoint, endpoints.size()); + public void addEndpoint(final DisruptorEndpoint disruptorEndpoint) { + lock.lock(); + try { + LOGGER.debug("Adding Endpoint: {}", disruptorEndpoint); + endpoints.add(disruptorEndpoint); + LOGGER.debug("Endpoint added: {}, new total endpoints {}", disruptorEndpoint, endpoints.size()); + } finally { + lock.unlock(); + } } - public synchronized void removeEndpoint(final DisruptorEndpoint disruptorEndpoint) { - LOGGER.debug("Removing Endpoint: {}", disruptorEndpoint); - if (getEndpointCount() == 1) { - LOGGER.debug("Last Endpoint removed, shutdown disruptor"); - //Shutdown our disruptor - shutdownDisruptor(false); - - //As there are no endpoints dependent on this Disruptor, we may also shutdown our executor - shutdownExecutor(); + public void removeEndpoint(final DisruptorEndpoint disruptorEndpoint) { + lock.lock(); + try { + LOGGER.debug("Removing Endpoint: {}", disruptorEndpoint); + if (getEndpointCount() == 1) { + LOGGER.debug("Last Endpoint removed, shutdown disruptor"); + //Shutdown our disruptor + shutdownDisruptor(false); + + //As there are no endpoints dependent on this Disruptor, we may also shutdown our executor + shutdownExecutor(); + } + endpoints.remove(disruptorEndpoint); + LOGGER.debug("Endpoint removed: {}, new total endpoints {}", disruptorEndpoint, getEndpointCount()); + } finally { + lock.unlock(); } - endpoints.remove(disruptorEndpoint); - LOGGER.debug("Endpoint removed: {}, new total endpoints {}", disruptorEndpoint, getEndpointCount()); } - public synchronized int getEndpointCount() { - return endpoints.size(); + public int getEndpointCount() { + lock.lock(); + try { + return endpoints.size(); + } finally { + lock.unlock(); + } } @Override diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java index 64ee8bb3157..59f14813128 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java @@ -21,6 +21,8 @@ import java.security.KeyStore; import java.security.cert.Certificate; import java.security.cert.CertificateFactory; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; @@ -80,7 +82,6 @@ class ElasticsearchProducer extends DefaultAsyncProducer { private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchProducer.class); protected final ElasticsearchConfiguration configuration; - private final Object mutex = new Object(); private volatile RestClient client; private Sniffer sniffer; @@ -477,7 +478,8 @@ class ElasticsearchProducer extends DefaultAsyncProducer { private void startClient() { if (client == null) { - synchronized (mutex) { + lock.lock(); + try { if (client == null) { LOG.info("Connecting to the ElasticSearch cluster: {}", configuration.getClusterName()); if (configuration.getHostAddressesList() != null @@ -487,6 +489,8 @@ class ElasticsearchProducer extends DefaultAsyncProducer { LOG.warn("Incorrect ip address and port parameters settings for ElasticSearch cluster"); } } + } finally { + lock.unlock(); } } } diff --git a/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/cloud/Etcd3WatchServiceDiscovery.java b/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/cloud/Etcd3WatchServiceDiscovery.java index 08947a3c5ba..ee1bf2e89a6 100644 --- a/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/cloud/Etcd3WatchServiceDiscovery.java +++ b/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/cloud/Etcd3WatchServiceDiscovery.java @@ -75,10 +75,6 @@ public class Etcd3WatchServiceDiscovery extends Etcd3ServiceDiscovery * The current watcher used to watch the changes of the service definitions. */ private final AtomicReference<Watch.Watcher> watcher = new AtomicReference<>(); - /** - * The mutex used to prevent concurrent load of the list of service definitions. - */ - private final Object mutex = new Object(); /** * Construct a {@code Etcd3WatchServiceDiscovery} with the given configuration. @@ -109,12 +105,15 @@ public class Etcd3WatchServiceDiscovery extends Etcd3ServiceDiscovery public List<ServiceDefinition> getServices(String name) { List<ServiceDefinition> servers = allServices; if (servers == null) { - synchronized (mutex) { + lock.lock(); + try { servers = allServices; if (servers == null) { servers = reloadServices(); doWatch(); } + } finally { + lock.unlock(); } } diff --git a/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/policy/Etcd3RoutePolicy.java b/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/policy/Etcd3RoutePolicy.java index 76cd607dee1..8f38719e71a 100644 --- a/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/policy/Etcd3RoutePolicy.java +++ b/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/policy/Etcd3RoutePolicy.java @@ -62,10 +62,6 @@ public class Etcd3RoutePolicy extends RoutePolicySupport implements CamelContext * The logger */ private static final Logger LOGGER = LoggerFactory.getLogger(Etcd3RoutePolicy.class); - /** - * The mutex used to prevent concurrent access to {@code suspendedRoutes}. - */ - private final Object mutex = new Object(); /** * The flag indicating whether the current node is a leader. */ @@ -173,15 +169,21 @@ public class Etcd3RoutePolicy extends RoutePolicySupport implements CamelContext @Override public void onStop(Route route) { - synchronized (mutex) { + lock.lock(); + try { suspendedRoutes.remove(route); + } finally { + lock.unlock(); } } @Override - public synchronized void onSuspend(Route route) { - synchronized (mutex) { + public void onSuspend(Route route) { + lock.lock(); + try { suspendedRoutes.remove(route); + } finally { + lock.unlock(); } } @@ -249,16 +251,17 @@ public class Etcd3RoutePolicy extends RoutePolicySupport implements CamelContext * @param route the route for which the consumer should be stopped. */ private void stopConsumer(Route route) { - synchronized (mutex) { - try { - if (!suspendedRoutes.contains(route)) { - LOGGER.debug("Stopping consumer for {} ({})", route.getId(), route.getConsumer()); - stopConsumer(route.getConsumer()); - suspendedRoutes.add(route); - } - } catch (Exception e) { - handleException(e); + lock.lock(); + try { + if (!suspendedRoutes.contains(route)) { + LOGGER.debug("Stopping consumer for {} ({})", route.getId(), route.getConsumer()); + stopConsumer(route.getConsumer()); + suspendedRoutes.add(route); } + } catch (Exception e) { + handleException(e); + } finally { + lock.unlock(); } } @@ -266,17 +269,18 @@ public class Etcd3RoutePolicy extends RoutePolicySupport implements CamelContext * Start all the consumers that have been stopped. */ private void startAllStoppedConsumers() { - synchronized (mutex) { - try { - for (Route suspendedRoute : suspendedRoutes) { - LOGGER.debug("Starting consumer for {} ({})", suspendedRoute.getId(), suspendedRoute.getConsumer()); - startConsumer(suspendedRoute.getConsumer()); - } - - suspendedRoutes.clear(); - } catch (Exception e) { - handleException(e); + lock.lock(); + try { + for (Route suspendedRoute : suspendedRoutes) { + LOGGER.debug("Starting consumer for {} ({})", suspendedRoute.getId(), suspendedRoute.getConsumer()); + startConsumer(suspendedRoute.getConsumer()); } + + suspendedRoutes.clear(); + } catch (Exception e) { + handleException(e); + } finally { + lock.unlock(); } } diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FileOperations.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FileOperations.java index f08c96485ef..74bac17ccbf 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/FileOperations.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FileOperations.java @@ -35,6 +35,8 @@ import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; import java.util.Date; import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.Exchange; import org.apache.camel.InvalidPayloadException; @@ -53,6 +55,7 @@ import static org.apache.camel.component.file.GenericFileHelper.asExclusiveReadL */ public class FileOperations implements GenericFileOperations<File> { private static final Logger LOG = LoggerFactory.getLogger(FileOperations.class); + private final Lock lock = new ReentrantLock(); private FileEndpoint endpoint; public FileOperations() { @@ -189,7 +192,8 @@ public class FileOperations implements GenericFileOperations<File> { // We need to make sure that this is thread-safe and only one thread // tries to create the path directory at the same time. - synchronized (this) { + lock.lock(); + try { if (path.isDirectory() && path.exists()) { // the directory already exists return true; @@ -197,6 +201,8 @@ public class FileOperations implements GenericFileOperations<File> { LOG.trace("Building directory: {}", path); return buildDirectory(path, endpoint.getDirectoryPermissions(), absolute); } + } finally { + lock.unlock(); } } diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java index 6d1bed9e1bf..da7f8101fd3 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java @@ -47,7 +47,7 @@ public class GenericFileProducer<T> extends DefaultProducer { protected GenericFileOperations<T> operations; // assume writing to 100 different files concurrently at most for the same // file producer - private final Map<String, Lock> locks = Collections.synchronizedMap(LRUCacheFactory.newLRUCache(100)); + private final Map<String, Lock> locks = LRUCacheFactory.newLRUCache(100); protected GenericFileProducer(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations) { super(endpoint); diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java index fc5e949cf87..591ad0690f5 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java @@ -18,9 +18,11 @@ package org.apache.camel.component.file.cluster; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; import org.apache.camel.CamelContext; import org.apache.camel.support.cluster.AbstractCamelClusterService; +import org.apache.camel.support.service.BaseService; import org.apache.camel.util.ObjectHelper; public class FileLockClusterService extends AbstractCamelClusterService<FileLockClusterView> { @@ -125,15 +127,21 @@ public class FileLockClusterService extends AbstractCamelClusterService<FileLock } } - synchronized ScheduledExecutorService getExecutor() { - if (executor == null) { - // Camel context should be set at this stage. - final CamelContext context = ObjectHelper.notNull(getCamelContext(), "CamelContext"); + ScheduledExecutorService getExecutor() { + Lock internalLock = getInternalLock(); + internalLock.lock(); + try { + if (executor == null) { + // Camel context should be set at this stage. + final CamelContext context = ObjectHelper.notNull(getCamelContext(), "CamelContext"); - executor = context.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, - "FileLockClusterService-" + getId()); - } + executor = context.getExecutorServiceManager() + .newSingleThreadScheduledExecutor(this, "FileLockClusterService-" + getId()); + } - return executor; + return executor; + } finally { + internalLock.unlock(); + } } } diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java index 6c679a657e7..a2e9ad9dfd7 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterView.java @@ -29,6 +29,8 @@ import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.cluster.CamelClusterMember; import org.apache.camel.support.cluster.AbstractCamelClusterView; @@ -38,6 +40,7 @@ import org.slf4j.LoggerFactory; public class FileLockClusterView extends AbstractCamelClusterView { private static final Logger LOGGER = LoggerFactory.getLogger(FileLockClusterView.class); + private static final Lock LOCK = new ReentrantLock(); private final ClusterMember localMember; private final Path path; private RandomAccessFile lockFile; @@ -134,7 +137,8 @@ public class FileLockClusterView extends AbstractCamelClusterView { return; } - synchronized (FileLockClusterView.this) { + LOCK.lock(); + try { if (lock != null) { LOGGER.info("Lock on file {} lost (lock={})", path, lock); fireLeadershipChangedEvent((CamelClusterMember) null); @@ -152,6 +156,8 @@ public class FileLockClusterView extends AbstractCamelClusterView { } else { LOGGER.debug("Lock on file {} not acquired ", path); } + } finally { + LOCK.unlock(); } } catch (OverlappingFileLockException e) { reason = new IOException(e); @@ -169,8 +175,11 @@ public class FileLockClusterView extends AbstractCamelClusterView { private final class ClusterMember implements CamelClusterMember { @Override public boolean isLeader() { - synchronized (FileLockClusterView.this) { + LOCK.lock(); + try { return lock != null && lock.isValid(); + } finally { + LOCK.unlock(); } } diff --git a/components/camel-freemarker/src/main/java/org/apache/camel/component/freemarker/FreemarkerComponent.java b/components/camel-freemarker/src/main/java/org/apache/camel/component/freemarker/FreemarkerComponent.java index d7ed57bccf8..c3f3a19efd7 100644 --- a/components/camel-freemarker/src/main/java/org/apache/camel/component/freemarker/FreemarkerComponent.java +++ b/components/camel-freemarker/src/main/java/org/apache/camel/component/freemarker/FreemarkerComponent.java @@ -85,24 +85,30 @@ public class FreemarkerComponent extends DefaultComponent { return endpoint; } - public synchronized Configuration getConfiguration() { - if (configuration == null) { - configuration = new Configuration(Configuration.VERSION_2_3_32); - configuration.setLocalizedLookup(isLocalizedLookup()); - configuration.setTemplateLoader(new URLTemplateLoader() { - @Override - protected URL getURL(String name) { - try { - return ResourceHelper.resolveMandatoryResourceAsUrl(getCamelContext(), name); - } catch (Exception e) { - // freemarker prefers to ask for locale first (eg xxx_en_GB, xxX_en), and then fallback without locale - // so we should return null to signal the resource could not be found - return null; + public Configuration getConfiguration() { + lock.lock(); + try { + if (configuration == null) { + configuration = new Configuration(Configuration.VERSION_2_3_32); + configuration.setLocalizedLookup(isLocalizedLookup()); + configuration.setTemplateLoader(new URLTemplateLoader() { + + @Override + protected URL getURL(String name) { + try { + return ResourceHelper.resolveMandatoryResourceAsUrl(getCamelContext(), name); + } catch (Exception e) { + // freemarker prefers to ask for locale first (eg xxx_en_GB, xxX_en), and then fallback without locale + // so we should return null to signal the resource could not be found + return null; + } } - } - }); + }); + } + return (Configuration) configuration.clone(); + } finally { + lock.unlock(); } - return (Configuration) configuration.clone(); } /** @@ -150,14 +156,19 @@ public class FreemarkerComponent extends DefaultComponent { this.localizedLookup = localizedLookup; } - private synchronized Configuration getNoCacheConfiguration() { - if (noCacheConfiguration == null) { - // create a clone of the regular configuration - noCacheConfiguration = (Configuration) getConfiguration().clone(); - // set this one to not use cache - noCacheConfiguration.setCacheStorage(new NullCacheStorage()); + private Configuration getNoCacheConfiguration() { + lock.lock(); + try { + if (noCacheConfiguration == null) { + // create a clone of the regular configuration + noCacheConfiguration = (Configuration) getConfiguration().clone(); + // set this one to not use cache + noCacheConfiguration.setCacheStorage(new NullCacheStorage()); + } + return noCacheConfiguration; + } finally { + lock.unlock(); } - return noCacheConfiguration; } } diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java index 7b4d77700b3..cda1af1413b 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java @@ -34,6 +34,8 @@ import java.util.Base64; import java.util.Hashtable; import java.util.List; import java.util.Vector; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import com.jcraft.jsch.ChannelSftp; @@ -83,6 +85,7 @@ public class SftpOperations implements RemoteFileOperations<SftpRemoteFile> { private SftpEndpoint endpoint; private ChannelSftp channel; private Session session; + private final Lock lock = new ReentrantLock(); private static class TaskPayload { final RemoteFileConfiguration configuration; @@ -117,32 +120,35 @@ public class SftpOperations implements RemoteFileOperations<SftpRemoteFile> { } @Override - public synchronized boolean connect(RemoteFileConfiguration configuration, Exchange exchange) + public boolean connect(RemoteFileConfiguration configuration, Exchange exchange) throws GenericFileOperationFailedException { - if (isConnected()) { - // already connected - return true; - } + lock.lock(); + try { + if (isConnected()) { + // already connected + return true; + } - BlockingTask task = Tasks - .foregroundTask() - .withBudget(Budgets.iterationBudget() - .withMaxIterations(Budgets.atLeastOnce(endpoint.getMaximumReconnectAttempts())) - .withInterval(Duration.ofMillis(endpoint.getReconnectDelay())) - .build()) - .build(); + BlockingTask task = Tasks.foregroundTask() + .withBudget(Budgets.iterationBudget() + .withMaxIterations(Budgets.atLeastOnce(endpoint.getMaximumReconnectAttempts())) + .withInterval(Duration.ofMillis(endpoint.getReconnectDelay())) + .build()) + .build(); - TaskPayload payload = new TaskPayload(configuration); + TaskPayload payload = new TaskPayload(configuration); - if (!task.run(this::tryConnect, payload)) { - throw new GenericFileOperationFailedException( - "Cannot connect to " + configuration.remoteServerInformation(), - payload.exception); - } + if (!task.run(this::tryConnect, payload)) { + throw new GenericFileOperationFailedException("Cannot connect to " + configuration.remoteServerInformation(), + payload.exception); + } - configureBulkRequests(); + configureBulkRequests(); - return true; + return true; + } finally { + lock.unlock(); + } } private boolean tryConnect(TaskPayload payload) { @@ -489,22 +495,33 @@ public class SftpOperations implements RemoteFileOperations<SftpRemoteFile> { } @Override - public synchronized boolean isConnected() throws GenericFileOperationFailedException { - return session != null && session.isConnected() && channel != null && channel.isConnected(); + public boolean isConnected() throws GenericFileOperationFailedException { + lock.lock(); + try { + return session != null && session.isConnected() && channel != null && channel.isConnected(); + } finally { + lock.unlock(); + } } @Override - public synchronized void disconnect() throws GenericFileOperationFailedException { - if (session != null && session.isConnected()) { - session.disconnect(); - } - if (channel != null && channel.isConnected()) { - channel.disconnect(); + public void disconnect() throws GenericFileOperationFailedException { + lock.lock(); + try { + if (session != null && session.isConnected()) { + session.disconnect(); + } + if (channel != null && channel.isConnected()) { + channel.disconnect(); + } + } finally { + lock.unlock(); } } @Override - public synchronized void forceDisconnect() throws GenericFileOperationFailedException { + public void forceDisconnect() throws GenericFileOperationFailedException { + lock.lock(); try { if (session != null) { session.disconnect(); @@ -516,6 +533,7 @@ public class SftpOperations implements RemoteFileOperations<SftpRemoteFile> { // ensure these session = null; channel = null; + lock.unlock(); } } @@ -526,8 +544,9 @@ public class SftpOperations implements RemoteFileOperations<SftpRemoteFile> { } @Override - public synchronized boolean deleteFile(String name) throws GenericFileOperationFailedException { + public boolean deleteFile(String name) throws GenericFileOperationFailedException { LOG.debug("Deleting file: {}", name); + lock.lock(); try { reconnectIfNecessary(null); channel.rm(name); @@ -535,12 +554,15 @@ public class SftpOperations implements RemoteFileOperations<SftpRemoteFile> { } catch (SftpException e) { LOG.debug("Cannot delete file {}: {}", name, e.getMessage(), e); throw new GenericFileOperationFailedException("Cannot delete file: " + name, e); + } finally { + lock.unlock(); } } @Override - public synchronized boolean renameFile(String from, String to) throws GenericFileOperationFailedException { + public boolean renameFile(String from, String to) throws GenericFileOperationFailedException { LOG.debug("Renaming file: {} to: {}", from, to); + lock.lock(); try { reconnectIfNecessary(null); // make use of the '/' separator because JSch expects this @@ -551,64 +573,71 @@ public class SftpOperations implements RemoteFileOperations<SftpRemoteFile> { } catch (SftpException e) { LOG.debug("Cannot rename file from: {} to: {}", from, to, e); throw new GenericFileOperationFailedException("Cannot rename file from: " + from + " to: " + to, e); + } finally { + lock.unlock(); } } @Override - public synchronized boolean buildDirectory(String directory, boolean absolute) throws GenericFileOperationFailedException { - // must normalize directory first - directory = endpoint.getConfiguration().normalizePath(directory); + public boolean buildDirectory(String directory, boolean absolute) throws GenericFileOperationFailedException { + lock.lock(); + try { + // must normalize directory first + directory = endpoint.getConfiguration().normalizePath(directory); - LOG.trace("buildDirectory({},{})", directory, absolute); - // ignore absolute as all dirs are relative with FTP - boolean success = false; + LOG.trace("buildDirectory({},{})", directory, absolute); + // ignore absolute as all dirs are relative with FTP + boolean success = false; - // whether to check for existing dir using CD or LS - boolean cdCheck = !this.endpoint.getConfiguration().isExistDirCheckUsingLs(); - String originalDirectory = cdCheck ? getCurrentDirectory() : null; + // whether to check for existing dir using CD or LS + boolean cdCheck = !this.endpoint.getConfiguration().isExistDirCheckUsingLs(); + String originalDirectory = cdCheck ? getCurrentDirectory() : null; - try { - // maybe the full directory already exists try { - if (cdCheck) { - channel.cd(directory); - } else { - // just do a fast listing - channel.ls(directory, entry -> ChannelSftp.LsEntrySelector.BREAK); - } - success = true; - } catch (SftpException e) { - // ignore, we could not change directory so try to create it - // instead - } - - if (!success) { - LOG.debug("Trying to build remote directory: {}", directory); + // maybe the full directory already exists try { - channel.mkdir(directory); + if (cdCheck) { + channel.cd(directory); + } else { + // just do a fast listing + channel.ls(directory, entry -> ChannelSftp.LsEntrySelector.BREAK); + } success = true; } catch (SftpException e) { - // we are here if the server side doesn't create - // intermediate folders - // so create the folder one by one - success = buildDirectoryChunks(directory); + // ignore, we could not change directory so try to create it + // instead } - // only after successfully creating directory, we may set chmod on the file - if (success) { - chmodOfDirectory(directory); + if (!success) { + LOG.debug("Trying to build remote directory: {}", directory); + try { + channel.mkdir(directory); + success = true; + } catch (SftpException e) { + // we are here if the server side doesn't create + // intermediate folders + // so create the folder one by one + success = buildDirectoryChunks(directory); + } + + // only after successfully creating directory, we may set chmod on the file + if (success) { + chmodOfDirectory(directory); + } } - } - // change back to original directory - if (originalDirectory != null) { - changeCurrentDirectory(originalDirectory); + // change back to original directory + if (originalDirectory != null) { + changeCurrentDirectory(originalDirectory); + } + } catch (SftpException e) { + throw new GenericFileOperationFailedException("Cannot build directory: " + directory, e); } - } catch (SftpException e) { - throw new GenericFileOperationFailedException("Cannot build directory: " + directory, e); - } - return success; + return success; + } finally { + lock.unlock(); + } } private boolean buildDirectoryChunks(String dirName) throws SftpException { @@ -650,81 +679,89 @@ public class SftpOperations implements RemoteFileOperations<SftpRemoteFile> { } @Override - public synchronized String getCurrentDirectory() throws GenericFileOperationFailedException { + public String getCurrentDirectory() throws GenericFileOperationFailedException { LOG.trace("getCurrentDirectory()"); + lock.lock(); try { String answer = channel.pwd(); LOG.trace("Current dir: {}", answer); return answer; } catch (SftpException e) { throw new GenericFileOperationFailedException("Cannot get current directory", e); + } finally { + lock.unlock(); } } @Override - public synchronized void changeCurrentDirectory(String path) throws GenericFileOperationFailedException { + public void changeCurrentDirectory(String path) throws GenericFileOperationFailedException { LOG.trace("changeCurrentDirectory({})", path); if (ObjectHelper.isEmpty(path)) { return; } + lock.lock(); + try { - // must compact path so SFTP server can traverse correctly, make use of - // the '/' - // separator because JSch expects this as the file separator even on - // Windows - String before = path; - char separatorChar = '/'; - path = FileUtil.compactPath(path, separatorChar); - if (LOG.isTraceEnabled()) { - LOG.trace("Compacted path: {} -> {} using separator: {}", before, path, separatorChar); - } + // must compact path so SFTP server can traverse correctly, make use of + // the '/' + // separator because JSch expects this as the file separator even on + // Windows + String before = path; + char separatorChar = '/'; + path = FileUtil.compactPath(path, separatorChar); + if (LOG.isTraceEnabled()) { + LOG.trace("Compacted path: {} -> {} using separator: {}", before, path, separatorChar); + } - // not stepwise should change directory in one operation - if (!endpoint.getConfiguration().isStepwise()) { - doChangeDirectory(path); - return; - } - if (getCurrentDirectory().startsWith(path)) { - // extract the path segment relative to the target path and make - // sure it keeps the preceding '/' for the regex op - String p = getCurrentDirectory().substring(path.length() - (path.endsWith("/") ? 1 : 0)); - if (p.isEmpty()) { + // not stepwise should change directory in one operation + if (!endpoint.getConfiguration().isStepwise()) { + doChangeDirectory(path); return; } - // the first character must be '/' and hence removed - path = UP_DIR_PATTERN.matcher(p).replaceAll("/..").substring(1); - } + if (getCurrentDirectory().startsWith(path)) { + // extract the path segment relative to the target path and make + // sure it keeps the preceding '/' for the regex op + String p = getCurrentDirectory().substring(path.length() - (path.endsWith("/") ? 1 : 0)); + if (p.isEmpty()) { + return; + } + // the first character must be '/' and hence removed + path = UP_DIR_PATTERN.matcher(p).replaceAll("/..").substring(1); + } - // if it starts with the root path then a little special handling for - // that - if (FileUtil.hasLeadingSeparator(path)) { - // change to root path - if (!path.matches("^[a-zA-Z]:(//|\\\\).*$")) { - doChangeDirectory(path.substring(0, 1)); - path = path.substring(1); - } else { - if (path.matches("^[a-zA-Z]:(//).*$")) { - doChangeDirectory(path.substring(0, 3)); - path = path.substring(3); - } else if (path.matches("^[a-zA-Z]:(\\\\).*$")) { - doChangeDirectory(path.substring(0, 4)); - path = path.substring(4); + // if it starts with the root path then a little special handling for + // that + if (FileUtil.hasLeadingSeparator(path)) { + // change to root path + if (!path.matches("^[a-zA-Z]:(//|\\\\).*$")) { + doChangeDirectory(path.substring(0, 1)); + path = path.substring(1); + } else { + if (path.matches("^[a-zA-Z]:(//).*$")) { + doChangeDirectory(path.substring(0, 3)); + path = path.substring(3); + } else if (path.matches("^[a-zA-Z]:(\\\\).*$")) { + doChangeDirectory(path.substring(0, 4)); + path = path.substring(4); + } } } - } - // split into multiple dirs - final String[] dirs = path.split("/|\\\\"); + // split into multiple dirs + final String[] dirs = path.split("/|\\\\"); - if (dirs == null || dirs.length == 0) { - // path was just a relative single path - doChangeDirectory(path); - return; - } + if (dirs == null || dirs.length == 0) { + // path was just a relative single path + doChangeDirectory(path); + return; + } - // there are multiple dirs so do this in chunks - for (String dir : dirs) { - doChangeDirectory(dir); + // there are multiple dirs so do this in chunks + for (String dir : dirs) { + doChangeDirectory(dir); + } + } finally { + lock.unlock(); } } @@ -741,32 +778,43 @@ public class SftpOperations implements RemoteFileOperations<SftpRemoteFile> { } @Override - public synchronized void changeToParentDirectory() throws GenericFileOperationFailedException { + public void changeToParentDirectory() throws GenericFileOperationFailedException { LOG.trace("changeToParentDirectory()"); - String current = getCurrentDirectory(); + lock.lock(); + try { + String current = getCurrentDirectory(); - String parent = FileUtil.compactPath(current + "/.."); - // must start with absolute - if (!parent.startsWith("/")) { - parent = "/" + parent; - } + String parent = FileUtil.compactPath(current + "/.."); + // must start with absolute + if (!parent.startsWith("/")) { + parent = "/" + parent; + } - changeCurrentDirectory(parent); + changeCurrentDirectory(parent); + } finally { + lock.unlock(); + } } @Override - public synchronized SftpRemoteFile[] listFiles() throws GenericFileOperationFailedException { - return listFiles("."); + public SftpRemoteFile[] listFiles() throws GenericFileOperationFailedException { + lock.lock(); + try { + return listFiles("."); + } finally { + lock.unlock(); + } } @Override - public synchronized SftpRemoteFile[] listFiles(String path) throws GenericFileOperationFailedException { + public SftpRemoteFile[] listFiles(String path) throws GenericFileOperationFailedException { LOG.trace("Listing remote files from path {}", path); if (ObjectHelper.isEmpty(path)) { // list current directory if file path is not given path = "."; } + lock.lock(); try { Vector<?> files = channel.ls(path); @@ -775,33 +823,45 @@ public class SftpOperations implements RemoteFileOperations<SftpRemoteFile> { .toArray(SftpRemoteFileJCraft[]::new); } catch (SftpException e) { throw new GenericFileOperationFailedException("Cannot list directory: " + path, e); + } finally { + lock.unlock(); } } @Override - public synchronized boolean retrieveFile(String name, Exchange exchange, long size) + public boolean retrieveFile(String name, Exchange exchange, long size) throws GenericFileOperationFailedException { LOG.trace("retrieveFile({})", name); - if (ObjectHelper.isNotEmpty(endpoint.getLocalWorkDirectory())) { - // local work directory is configured so we should store file - // content as files in this local directory - return retrieveFileToFileInLocalWorkDirectory(name, exchange); - } else { - // store file content directory as stream on the body - return retrieveFileToStreamInBody(name, exchange); + lock.lock(); + try { + if (ObjectHelper.isNotEmpty(endpoint.getLocalWorkDirectory())) { + // local work directory is configured so we should store file + // content as files in this local directory + return retrieveFileToFileInLocalWorkDirectory(name, exchange); + } else { + // store file content directory as stream on the body + return retrieveFileToStreamInBody(name, exchange); + } + } finally { + lock.unlock(); } } @Override - public synchronized void releaseRetrievedFileResources(Exchange exchange) throws GenericFileOperationFailedException { - InputStream is = exchange.getIn().getHeader(FtpConstants.REMOTE_FILE_INPUT_STREAM, InputStream.class); + public void releaseRetrievedFileResources(Exchange exchange) throws GenericFileOperationFailedException { + lock.lock(); + try { + InputStream is = exchange.getIn().getHeader(FtpConstants.REMOTE_FILE_INPUT_STREAM, InputStream.class); - if (is != null) { - try { - is.close(); - } catch (IOException e) { - throw new GenericFileOperationFailedException(e.getMessage(), e); + if (is != null) { + try { + is.close(); + } catch (IOException e) { + throw new GenericFileOperationFailedException(e.getMessage(), e); + } } + } finally { + lock.unlock(); } } @@ -970,40 +1030,45 @@ public class SftpOperations implements RemoteFileOperations<SftpRemoteFile> { } @Override - public synchronized boolean storeFile(String name, Exchange exchange, long size) + public boolean storeFile(String name, Exchange exchange, long size) throws GenericFileOperationFailedException { - // must normalize name first - name = endpoint.getConfiguration().normalizePath(name); + lock.lock(); + try { + // must normalize name first + name = endpoint.getConfiguration().normalizePath(name); - LOG.trace("storeFile({})", name); + LOG.trace("storeFile({})", name); - boolean answer; - String currentDir = null; - String path = FileUtil.onlyPath(name); - String targetName = name; + boolean answer; + String currentDir = null; + String path = FileUtil.onlyPath(name); + String targetName = name; - if (path != null && endpoint.getConfiguration().isStepwise()) { - // must remember current dir so we stay in that directory after - // the write - currentDir = getCurrentDirectory(); + if (path != null && endpoint.getConfiguration().isStepwise()) { + // must remember current dir so we stay in that directory after + // the write + currentDir = getCurrentDirectory(); - // change to path of name - changeCurrentDirectory(path); + // change to path of name + changeCurrentDirectory(path); - // the target name should be without path, as we have changed - // directory - targetName = FileUtil.stripPath(name); - } + // the target name should be without path, as we have changed + // directory + targetName = FileUtil.stripPath(name); + } - // store the file - answer = doStoreFile(name, targetName, exchange); + // store the file + answer = doStoreFile(name, targetName, exchange); - // change back to current directory if we changed directory - if (currentDir != null) { - changeCurrentDirectory(currentDir); - } + // change back to current directory if we changed directory + if (currentDir != null) { + changeCurrentDirectory(currentDir); + } - return answer; + return answer; + } finally { + lock.unlock(); + } } private boolean doStoreFile(String name, String targetName, Exchange exchange) throws GenericFileOperationFailedException { @@ -1088,44 +1153,48 @@ public class SftpOperations implements RemoteFileOperations<SftpRemoteFile> { } @Override - public synchronized boolean existsFile(String name) throws GenericFileOperationFailedException { + public boolean existsFile(String name) throws GenericFileOperationFailedException { LOG.trace("existsFile({})", name); - if (endpoint.isFastExistsCheck()) { - return fastExistsFile(name); - } - // check whether a file already exists - String directory = FileUtil.onlyPath(name); - if (directory == null) { - // assume current dir if no path could be extracted - directory = "."; - } - String onlyName = FileUtil.stripPath(name); - + lock.lock(); try { - @SuppressWarnings("rawtypes") - List files = channel.ls(directory); - // can return either null or an empty list depending on FTP servers - if (files == null) { - return false; + if (endpoint.isFastExistsCheck()) { + return fastExistsFile(name); } - for (Object file : files) { - ChannelSftp.LsEntry entry = (ChannelSftp.LsEntry) file; - String existing = entry.getFilename(); - LOG.trace("Existing file: {}, target file: {}", existing, name); - existing = FileUtil.stripPath(existing); - if (existing != null && existing.equals(onlyName)) { - return true; - } + // check whether a file already exists + String directory = FileUtil.onlyPath(name); + if (directory == null) { + // assume current dir if no path could be extracted + directory = "."; } - return false; - } catch (SftpException e) { - // or an exception can be thrown with id 2 which means file does not - // exists - if (ChannelSftp.SSH_FX_NO_SUCH_FILE == e.id) { + String onlyName = FileUtil.stripPath(name); + + try { + @SuppressWarnings("rawtypes") List files = channel.ls(directory); + // can return either null or an empty list depending on FTP servers + if (files == null) { + return false; + } + for (Object file : files) { + ChannelSftp.LsEntry entry = (ChannelSftp.LsEntry) file; + String existing = entry.getFilename(); + LOG.trace("Existing file: {}, target file: {}", existing, name); + existing = FileUtil.stripPath(existing); + if (existing != null && existing.equals(onlyName)) { + return true; + } + } return false; + } catch (SftpException e) { + // or an exception can be thrown with id 2 which means file does not + // exists + if (ChannelSftp.SSH_FX_NO_SUCH_FILE == e.id) { + return false; + } + // otherwise its a more serious error so rethrow + throw new GenericFileOperationFailedException(e.getMessage(), e); } - // otherwise its a more serious error so rethrow - throw new GenericFileOperationFailedException(e.getMessage(), e); + } finally { + lock.unlock(); } } @@ -1150,21 +1219,26 @@ public class SftpOperations implements RemoteFileOperations<SftpRemoteFile> { } @Override - public synchronized boolean sendNoop() throws GenericFileOperationFailedException { - if (isConnected()) { - try { - session.sendKeepAliveMsg(); - return true; - } catch (Exception e) { - LOG.debug("SFTP session was closed. Ignoring this exception.", e); - return false; + public boolean sendNoop() throws GenericFileOperationFailedException { + lock.lock(); + try { + if (isConnected()) { + try { + session.sendKeepAliveMsg(); + return true; + } catch (Exception e) { + LOG.debug("SFTP session was closed. Ignoring this exception.", e); + return false; + } } + return false; + } finally { + lock.unlock(); } - return false; } @Override - public synchronized boolean sendSiteCommand(String command) throws GenericFileOperationFailedException { + public boolean sendSiteCommand(String command) throws GenericFileOperationFailedException { // is not implemented return true; } diff --git a/core/camel-api/src/main/java/org/apache/camel/support/service/BaseService.java b/core/camel-api/src/main/java/org/apache/camel/support/service/BaseService.java index 095b92c0657..9d391be7e50 100644 --- a/core/camel-api/src/main/java/org/apache/camel/support/service/BaseService.java +++ b/core/camel-api/src/main/java/org/apache/camel/support/service/BaseService.java @@ -451,4 +451,7 @@ public abstract class BaseService { return Holder.LOG; } + protected Lock getInternalLock() { + return lock; + } }
