CAMEL-9239: camel-sjms - Add completionInterval to batch consumer. Polished the code and fixed some mistakes.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cf288b6a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cf288b6a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cf288b6a Branch: refs/heads/master Commit: cf288b6a1ce8828ea500077f55161c67c4d95a33 Parents: e58534a Author: Claus Ibsen <[email protected]> Authored: Sun Feb 21 10:24:31 2016 +0100 Committer: Claus Ibsen <[email protected]> Committed: Sun Feb 21 11:02:47 2016 +0100 ---------------------------------------------------------------------- .../component/sjms/batch/SessionCompletion.java | 6 +- .../component/sjms/batch/SjmsBatchConsumer.java | 149 ++++++++++++++----- .../component/sjms/batch/SjmsBatchEndpoint.java | 69 +++++++-- 3 files changed, 176 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/cf288b6a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java index cae90cb..f2a7e69 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java @@ -41,8 +41,7 @@ class SessionCompletion implements Synchronization { LOG.debug("Committing"); session.commit(); } catch (JMSException ex) { - LOG.error("Exception caught while committing: {}", ex.getMessage()); - exchange.setException(ex); + LOG.warn("Exception caught while committing JMS session", ex); } } @@ -52,8 +51,7 @@ class SessionCompletion implements Synchronization { LOG.debug("Rolling back"); session.rollback(); } catch (JMSException ex) { - LOG.error("Exception caught while rolling back: {}", ex.getMessage()); - exchange.setException(ex); + LOG.warn("Exception caught while rolling back JMS session", ex); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/cf288b6a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java index 505aa07..5316664 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java @@ -16,11 +16,10 @@ */ package org.apache.camel.component.sjms.batch; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.Date; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -45,6 +44,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SjmsBatchConsumer extends DefaultConsumer { + + public static final String SJMS_BATCH_TIMEOUT_CHECKER = "SJmsBatchTimeoutChecker"; + private static final boolean TRANSACTED = true; private static final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumer.class); @@ -53,9 +55,14 @@ public class SjmsBatchConsumer extends DefaultConsumer { private static final AtomicLong MESSAGE_RECEIVED = new AtomicLong(); private static final AtomicLong MESSAGE_PROCESSED = new AtomicLong(); + private ScheduledExecutorService timeoutCheckerExecutorService; + private boolean shutdownTimeoutCheckerExecutorService; + private final AtomicBoolean completionTimeoutTrigger = new AtomicBoolean(); + private final SjmsBatchEndpoint sjmsBatchEndpoint; private final AggregationStrategy aggregationStrategy; private final int completionSize; + private final int completionInterval; private final int completionTimeout; private final int consumerCount; private final int pollDuration; @@ -76,7 +83,11 @@ public class SjmsBatchConsumer extends DefaultConsumer { destinationName = ObjectHelper.notEmpty(sjmsBatchEndpoint.getDestinationName(), "destinationName"); completionSize = sjmsBatchEndpoint.getCompletionSize(); + completionInterval = sjmsBatchEndpoint.getCompletionInterval(); completionTimeout = sjmsBatchEndpoint.getCompletionTimeout(); + if (completionInterval > 0 && completionTimeout != SjmsBatchEndpoint.DEFAULT_COMPLETION_TIMEOUT) { + throw new IllegalArgumentException("Only one of completionInterval or completionTimeout can be used, not both."); + } pollDuration = sjmsBatchEndpoint.getPollDuration(); if (pollDuration < 0) { throw new IllegalArgumentException("pollDuration must be 0 or greater"); @@ -98,18 +109,21 @@ public class SjmsBatchConsumer extends DefaultConsumer { return sjmsBatchEndpoint; } + public ScheduledExecutorService getTimeoutCheckerExecutorService() { + return timeoutCheckerExecutorService; + } + + public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) { + this.timeoutCheckerExecutorService = timeoutCheckerExecutorService; + } + @Override protected void doStart() throws Exception { super.doStart(); // start up a shared connection - try { - connection = connectionFactory.createConnection(); - connection.start(); - } catch (JMSException ex) { - LOG.error("Exception caught closing connection: {}", getStackTrace(ex)); - return; - } + connection = connectionFactory.createConnection(); + connection.start(); if (LOG.isInfoEnabled()) { LOG.info("Starting " + consumerCount + " consumer(s) for " + destinationName + ":" + completionSize); @@ -121,12 +135,24 @@ public class SjmsBatchConsumer extends DefaultConsumer { for (int i = 0; i < consumerCount; i++) { jmsConsumerExecutors.execute(new BatchConsumptionLoop()); } + + if (completionInterval > 0) { + LOG.info("Using CompletionInterval to run every " + completionInterval + " millis."); + if (timeoutCheckerExecutorService == null) { + setTimeoutCheckerExecutorService(getEndpoint().getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, SJMS_BATCH_TIMEOUT_CHECKER, 1)); + shutdownTimeoutCheckerExecutorService = true; + } + // trigger completion based on interval + timeoutCheckerExecutorService.scheduleAtFixedRate(new CompletionIntervalTask(completionTimeoutTrigger), completionInterval, completionInterval, TimeUnit.MILLISECONDS); + } + } @Override protected void doStop() throws Exception { super.doStop(); running.set(false); + CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get(); if (consumersShutdownLatch != null) { LOG.info("Stop signalled, waiting on consumers to shut down"); @@ -142,17 +168,40 @@ public class SjmsBatchConsumer extends DefaultConsumer { try { LOG.debug("Shutting down JMS connection"); connection.close(); - } catch (JMSException jex) { - LOG.error("Exception caught closing connection: {}", getStackTrace(jex)); + } catch (Exception e) { + LOG.warn("Exception caught closing JMS connection", e); } getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(jmsConsumerExecutors); + jmsConsumerExecutors = null; + + if (shutdownTimeoutCheckerExecutorService) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(timeoutCheckerExecutorService); + timeoutCheckerExecutorService = null; + } } - private String getStackTrace(Exception ex) { - StringWriter writer = new StringWriter(); - ex.printStackTrace(new PrintWriter(writer)); - return writer.toString(); + /** + * Background task that triggers completion based on interval. + */ + private final class CompletionIntervalTask implements Runnable { + + private final AtomicBoolean timeoutInterval; + + public CompletionIntervalTask(AtomicBoolean timeoutInterval) { + this.timeoutInterval = timeoutInterval; + } + + public void run() { + // only run if CamelContext has been fully started + if (!getEndpoint().getCamelContext().getStatus().isStarted()) { + LOG.trace("Completion interval task cannot start due CamelContext({}) has not been started yet", getEndpoint().getCamelContext().getName()); + return; + } + + // signal + timeoutInterval.set(true); + } } private class BatchConsumptionLoop implements Runnable { @@ -168,24 +217,32 @@ public class SjmsBatchConsumer extends DefaultConsumer { MessageConsumer consumer = session.createConsumer(queue); try { - consumeBatchesOnLoop(session, consumer); + consumeBatchesOnLoop(session, consumer, completionTimeoutTrigger); } finally { try { consumer.close(); } catch (JMSException ex2) { - log.error("Exception caught closing consumer: {}", ex2.getMessage()); + // only include stacktrace in debug logging + if (log.isDebugEnabled()) { + log.debug("Exception caught closing consumer", ex2); + } + log.warn("Exception caught closing consumer: {}", ex2.getMessage()); } } } finally { try { session.close(); } catch (JMSException ex1) { - log.error("Exception caught closing session: {}", ex1.getMessage()); + // only include stacktrace in debug logging + if (log.isDebugEnabled()) { + log.debug("Exception caught closing session: {}", ex1); + } + log.warn("Exception caught closing session: {}", ex1.getMessage()); } } } catch (JMSException ex) { // from loop - LOG.error("Exception caught consuming from {}: {}", destinationName, getStackTrace(ex)); + LOG.warn("Exception caught consuming from " + destinationName, ex); } finally { // indicate that we have shut down CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get(); @@ -193,20 +250,23 @@ public class SjmsBatchConsumer extends DefaultConsumer { } } - private void consumeBatchesOnLoop(Session session, MessageConsumer consumer) throws JMSException { + private void consumeBatchesOnLoop(final Session session, final MessageConsumer consumer, final AtomicBoolean timeoutInterval) throws JMSException { final boolean usingTimeout = completionTimeout > 0; batchConsumption: while (running.get()) { + // reset the state + boolean timeout = false; int messageCount = 0; - - // reset the clock counters long timeElapsed = 0; long startTime = 0; Exchange aggregatedExchange = null; batch: - while ((completionSize <= 0) || (messageCount < completionSize)) { + // loop while no timeout or interval triggered and while we have room still for messages in the batch + while (!timeout && !timeoutInterval.compareAndSet(true, false) + && (usingTimeout || (completionSize > 0 && messageCount < completionSize))) { + // check periodically to see whether we should be shutting down long waitTime = (usingTimeout && (timeElapsed > 0)) ? getReceiveWaitTime(timeElapsed) @@ -222,10 +282,9 @@ public class SjmsBatchConsumer extends DefaultConsumer { startTime = new Date().getTime(); // start counting down the period for this batch } messageCount++; - LOG.debug("Message received: {}", messageCount); - if ((message instanceof ObjectMessage) - || (message instanceof TextMessage)) { - + LOG.debug("#{} messages received", messageCount); + // TODO: why only object or text messages? + if (message instanceof ObjectMessage || message instanceof TextMessage) { final Exchange exchange = getEndpoint().createExchange(message, session); aggregatedExchange = aggregationStrategy.aggregate(aggregatedExchange, exchange); aggregatedExchange.setProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, messageCount); @@ -241,7 +300,7 @@ public class SjmsBatchConsumer extends DefaultConsumer { if (timeElapsed > completionTimeout) { // batch finished by timeout - break batch; + timeout = true; } } @@ -250,8 +309,14 @@ public class SjmsBatchConsumer extends DefaultConsumer { session.rollback(); break batchConsumption; } - } // batch - process(aggregatedExchange, session); + } + + // batch + if (aggregatedExchange == null && getEndpoint().isSendEmptyMessageWhenIdle()) { + processEmptyMessage(); + } else if (aggregatedExchange != null) { + processBatch(aggregatedExchange, session); + } } } @@ -268,9 +333,9 @@ public class SjmsBatchConsumer extends DefaultConsumer { if (timeRemaining <= 0) { // ensure that the thread doesn't wait indefinitely timeRemaining = 1; } - final long waitTime = (timeRemaining > pollDuration) ? pollDuration : timeRemaining; + final long waitTime = Math.min(timeRemaining, pollDuration); - LOG.debug("waiting for {}", waitTime); + LOG.trace("Waiting for {}", waitTime); return waitTime; } @@ -282,7 +347,23 @@ public class SjmsBatchConsumer extends DefaultConsumer { return timeRemaining; } - private void process(Exchange exchange, Session session) { + /** + * No messages in batch so send an empty message instead. + */ + private void processEmptyMessage() { + Exchange exchange = getEndpoint().createExchange(); + log.debug("Sending empty message as there were no messages from polling: {}", getEndpoint()); + try { + getProcessor().process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error processing exchange", exchange, e); + } + } + + /** + * Send an message with the batches messages. + */ + private void processBatch(Exchange exchange, Session session) { int id = BATCH_COUNT.getAndIncrement(); int batchSize = exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class); if (LOG.isDebugEnabled()) { @@ -296,7 +377,7 @@ public class SjmsBatchConsumer extends DefaultConsumer { long total = MESSAGE_PROCESSED.addAndGet(batchSize); LOG.debug("Completed processing[{}]:total={}", id, total); } catch (Exception e) { - LOG.error("Error processing exchange: {}", e.getMessage()); + getExceptionHandler().handleException("Error processing exchange", exchange, e); } } http://git-wip-us.apache.org/repos/asf/camel/blob/cf288b6a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java index 790f1ef..d989e08 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.sjms.batch; +import java.util.concurrent.ScheduledExecutorService; import javax.jms.Message; import javax.jms.Session; @@ -55,6 +56,8 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt @UriPath @Metadata(required = "true") private String destinationName; + @UriParam @Metadata(required = "true") + private AggregationStrategy aggregationStrategy; @UriParam(defaultValue = "1") private int consumerCount = 1; @UriParam(defaultValue = "200") @@ -62,22 +65,25 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt @UriParam(defaultValue = "500") private int completionTimeout = DEFAULT_COMPLETION_TIMEOUT; @UriParam(defaultValue = "1000") - private int pollDuration = 1000; - @UriParam @Metadata(required = "true") - private AggregationStrategy aggregationStrategy; + private int completionInterval; @UriParam - private HeaderFilterStrategy headerFilterStrategy; + private boolean sendEmptyMessageWhenIdle; + @UriParam(defaultValue = "1000") + private int pollDuration = 1000; @UriParam private boolean includeAllJMSXProperties; @UriParam(defaultValue = "true") private boolean allowNullBody = true; @UriParam(defaultValue = "true") private boolean mapJmsMessage = true; - @UriParam + @UriParam(label = "advanced") + private HeaderFilterStrategy headerFilterStrategy; + @UriParam(label = "advanced") private MessageCreatedStrategy messageCreatedStrategy; - @UriParam + @UriParam(label = "advanced") private JmsKeyFormatStrategy jmsKeyFormatStrategy; - + @UriParam(label = "advanced") + private ScheduledExecutorService timeoutCheckerExecutorService; public SjmsBatchEndpoint() { } @@ -100,12 +106,15 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt @Override public Producer createProducer() throws Exception { - throw new UnsupportedOperationException("Cannot produce though a " + SjmsBatchEndpoint.class.getName()); + throw new UnsupportedOperationException("Producer not supported"); } @Override public Consumer createConsumer(Processor processor) throws Exception { - return new SjmsBatchConsumer(this, processor); + SjmsBatchConsumer consumer = new SjmsBatchConsumer(this, processor); + consumer.setTimeoutCheckerExecutorService(timeoutCheckerExecutorService); + configureConsumer(consumer); + return consumer; } public Exchange createExchange(Message message, Session session) { @@ -181,12 +190,41 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt } /** - * The timeout from receipt of the first first message when the batch will be completed + * The timeout in millis from receipt of the first first message when the batch will be completed. + * The batch may be empty if the timeout triggered and there was no messages in the batch. + * <br/> + * Notice you cannot use both completion timeout and completion interval at the same time, only one can be configured. */ public void setCompletionTimeout(int completionTimeout) { this.completionTimeout = completionTimeout; } + public int getCompletionInterval() { + return completionInterval; + } + + /** + * The completion interval in millis, which causes batches to be completed in a scheduled fixed rate every interval. + * The batch may be empty if the timeout triggered and there was no messages in the batch. + * <br/> + * Notice you cannot use both completion timeout and completion interval at the same time, only one can be configured. + */ + public void setCompletionInterval(int completionInterval) { + this.completionInterval = completionInterval; + } + + public boolean isSendEmptyMessageWhenIdle() { + return sendEmptyMessageWhenIdle; + } + + /** + * If using completion timeout or interval, then the batch may be empty if the timeout triggered and there was no messages in the batch. + * If this option is <tt>true</tt> and the batch is empty then an empty message is added to the batch so an empty message is routed. + */ + public void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle) { + this.sendEmptyMessageWhenIdle = sendEmptyMessageWhenIdle; + } + public int getPollDuration() { return pollDuration; } @@ -280,4 +318,15 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt this.includeAllJMSXProperties = includeAllJMSXProperties; } + public ScheduledExecutorService getTimeoutCheckerExecutorService() { + return timeoutCheckerExecutorService; + } + + /** + * If using the completionInterval option a background thread is created to trigger the completion interval. + * Set this option to provide a custom thread pool to be used rather than creating a new thread for every consumer. + */ + public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) { + this.timeoutCheckerExecutorService = timeoutCheckerExecutorService; + } }
