This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new e76bf0f CAMEL-15653: camel-sjms - Improve logging for sjms batch
recovery task. Thanks to Brad Harvey for the patch.
e76bf0f is described below
commit e76bf0ffd1738386d580782f3c0c71deda4b2fa3
Author: Claus Ibsen <[email protected]>
AuthorDate: Sun Dec 6 18:21:29 2020 +0100
CAMEL-15653: camel-sjms - Improve logging for sjms batch recovery task.
Thanks to Brad Harvey for the patch.
---
.../component/sjms/batch/SjmsBatchComponent.java | 22 -----------
.../component/sjms/batch/SjmsBatchConsumer.java | 44 +++++++++++++---------
2 files changed, 27 insertions(+), 39 deletions(-)
diff --git
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
index 8bdb635..ddb0dce 100644
---
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
+++
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
@@ -17,7 +17,6 @@
package org.apache.camel.component.sjms.batch;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
import javax.jms.ConnectionFactory;
@@ -30,8 +29,6 @@ import org.apache.camel.util.ObjectHelper;
@Component("sjms-batch")
public class SjmsBatchComponent extends HeaderFilterStrategyComponent {
- private ExecutorService asyncStartStopExecutorService;
-
@Metadata(label = "advanced")
private ConnectionFactory connectionFactory;
@Metadata(label = "advanced")
@@ -97,23 +94,4 @@ public class SjmsBatchComponent extends
HeaderFilterStrategyComponent {
this.recoveryInterval = recoveryInterval;
}
- @Override
- protected void doShutdown() throws Exception {
- if (asyncStartStopExecutorService != null) {
-
getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartStopExecutorService);
- asyncStartStopExecutorService = null;
- }
- super.doShutdown();
- }
-
- protected synchronized ExecutorService getAsyncStartStopExecutorService() {
- if (asyncStartStopExecutorService == null) {
- // use a cached thread pool for async start tasks as they can run
for a while, and we need a dedicated thread
- // for each task, and the thread pool will shrink when no more
tasks running
- asyncStartStopExecutorService
- =
getCamelContext().getExecutorServiceManager().newCachedThreadPool(this,
"AsyncStartStopListener");
- }
- return asyncStartStopExecutorService;
- }
-
}
diff --git
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index a625877..b8c7e95 100644
---
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -42,6 +42,7 @@ import org.apache.camel.ExtendedExchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
@@ -138,8 +139,11 @@ public class SjmsBatchConsumer extends DefaultConsumer {
= new StartConsumerTask(recovery,
getEndpoint().getRecoveryInterval(), getEndpoint().getKeepAliveDelay());
if (recovery) {
- // use a background thread to keep starting the consumer until
-
getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(task);
+ // use a background thread to keep starting the consumer until it
can connect successfully
+ String threadNameSuffix = "AsyncStartStopListener[" +
destinationName + "]";
+ ExecutorServiceManager executorServiceManager =
getEndpoint().getCamelContext().getExecutorServiceManager();
+ Thread thread = executorServiceManager.newThread(threadNameSuffix,
task);
+ thread.start();
} else {
task.run();
}
@@ -168,7 +172,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
consumersShutdownLatchRef.set(new CountDownLatch(consumerCount));
if (completionInterval > 0) {
- LOG.info("Using CompletionInterval to run every {} millis.",
completionInterval);
+ LOG.info("Using CompletionInterval to run every {} millis for
{}.", completionInterval, destinationName);
if (timeoutCheckerExecutorService == null) {
setTimeoutCheckerExecutorService(getEndpoint().getCamelContext().getExecutorServiceManager()
.newScheduledThreadPool(this,
SJMS_BATCH_TIMEOUT_CHECKER, 1));
@@ -208,7 +212,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
}
if (attempt > 1) {
- LOG.info("Successfully refreshed connection after {}
attempts.", attempt);
+ LOG.info("Successfully refreshed connection to {}
after {} attempts.", destinationName, attempt);
}
LOG.info("Started {} consumer(s) for {}:{}",
consumerCount, destinationName, completionSize);
@@ -234,7 +238,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
// sleeping before next attempt
try {
- LOG.debug("Attempt #{}. Sleeping {} before next attempt to
recover", attempt, recoveryInterval);
+ LOG.debug("Attempt #{}. Sleeping {} before next attempt to
recover {}", attempt, recoveryInterval,
+ destinationName);
Thread.sleep(recoveryInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -251,18 +256,19 @@ public class SjmsBatchConsumer extends DefaultConsumer {
CountDownLatch consumersShutdownLatch =
consumersShutdownLatchRef.get();
if (consumersShutdownLatch != null) {
- LOG.info("Stop signalled, waiting on consumers to shut down");
+ LOG.info("Stop signalled, waiting on consumers for {} to shut
down", destinationName);
if (consumersShutdownLatch.await(60, TimeUnit.SECONDS)) {
- LOG.warn("Timeout waiting on consumer threads to signal
completion - shutting down");
+ LOG.warn("Timeout waiting on consumer threads for {} to signal
completion - shutting down", destinationName);
} else {
- LOG.info("All consumers have been shutdown");
+ LOG.info("All consumers for {} have been shutdown",
destinationName);
}
} else {
- LOG.info("Stop signalled while there are no consumers yet, so no
need to wait for consumers");
+ LOG.info("Stop signalled while there are no consumers for {} yet,
so no need to wait for consumers",
+ destinationName);
}
try {
- LOG.debug("Shutting down JMS connection");
+ LOG.debug("Shutting down JMS connection for {}", destinationName);
connection.close();
} catch (Exception e) {
// ignore
@@ -374,7 +380,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (LOG.isDebugEnabled()) {
LOG.debug("Exception caught closing consumer", ex2);
}
- LOG.warn("Exception caught closing consumer: {}. This
exception is ignored.", ex2.getMessage());
+ LOG.warn("Exception caught closing consumer for : {}. This
exception is ignored.", destinationName,
+ ex2.getMessage());
}
}
@@ -388,7 +395,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (LOG.isDebugEnabled()) {
LOG.debug("Exception caught closing session", ex2);
}
- LOG.warn("Exception caught closing session: {}. This exception
is ignored.", ex2.getMessage());
+ LOG.warn("Exception caught closing session for {}: {}. This
exception is ignored.", destinationName,
+ ex2.getMessage());
}
}
@@ -473,8 +481,9 @@ public class SjmsBatchConsumer extends DefaultConsumer {
reset();
}
} catch (Exception e) {
- LOG.warn("Error during evaluation of
completion predicate {}. This exception is ignored.",
- e.getMessage(), e);
+ LOG.warn("Error during evaluation of
completion predicate " + e.getMessage()
+ + ". This exception is ignored.",
+ e);
}
}
}
@@ -494,7 +503,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
}
} else {
- LOG.info("Shutdown signal received - rolling back
batch");
+ LOG.info("Shutdown signal received - rolling back {}
pending in batch from destination {}",
+ messageCount, destinationName);
session.rollback();
}
}
@@ -569,7 +579,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
int batchSize = exchange.getProperty(Exchange.BATCH_SIZE,
Integer.class);
if (LOG.isDebugEnabled()) {
long total = MESSAGE_RECEIVED.get() + batchSize;
- LOG.debug("Processing batch[{}]:size={}:total={}", id,
batchSize, total);
+ LOG.debug("Processing batch[" + id + "]:size=" + batchSize +
":total=" + total);
}
if ("timeout".equals(completedBy)) {
@@ -587,7 +597,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
long total = MESSAGE_PROCESSED.addAndGet(batchSize);
LOG.debug("Completed processing[{}]:total={}", id, total);
} catch (Exception e) {
- getExceptionHandler().handleException("Error processing
exchange", exchange, e);
+ getExceptionHandler().handleException("Error processing
exchange from " + destinationName, exchange, e);
}
}