I have noticed that the BatchProcessor class uses the Thread class interrupt method to wake the run loop from sleeping within the enqueueExchange method.
The unfortunate side effect of this is that if the run loop is in the middle of processing exchanges, and the processing involves something slow like establishing a JMS connection over SSL, then the processing can become interrupted. This all became apparent during some performance testing that resulted in continuously adding exchanges to the aggregator, the threshold becoming reached, and then trying to enqueue the aggregated result to a JMS queue. If my analysis of the BatchProcessor is correct then I would recommend a Condition object being used instead of relying upon interrupting a thread. Perhaps something like the following (untested - could be mistakes of course - assumes present Camel 1.6 trunk): private class BatchSender extends Thread { private boolean enqueuedExchange = false; private Lock runLock = new ReentrantLock(); private Condition runCondition = runLock.newCondition(); private LinkedBlockingQueue queue; public BatchSender() { super("Batch Sender"); this.queue = new LinkedBlockingQueue(); } @Override public void run() { runLock.lock(); try { do { try { if (!enqueuedExchanged) { runCondition.await(batchTimeout, TimeUnit.MILLISECONDS); if (!enqueuedExchanged) { queue.drainTo(collection, batchSize); } } if (enqueuedExchanged) { enqueuedExchange = false; runLock.unlock(); try { while (isInBatchCompleted(queue.size())) { queue.drainTo(collection, batchSize); } if (!isOutBatchCompleted()) { continue; } } finally { runLock.lock(); } } runLock.unlock(); try { try { sendExchanges(); } catch (Exception e) { getExceptionHandler().handleException(e); } } finally { runLock.lock(); } } catch (InterruptedException e) { break; } } while (true); } finally { runLock.unlock(); } } public void cancel() { interrupt(); } public void enqueueExchange(Exchange exchange) { queue.add(exchange); runLock.lock(); try { enqueuedExchange = true; runCondition.signal(); } finally { runLock.unlock(); } } private void sendExchanges() throws Exception { Iterator iter = collection.iterator(); while (iter.hasNext()) { Exchange exchange = iter.next(); iter.remove(); processExchange(exchange); } } } I acknowledge that the above is more complex than what is there presently but I think the complexity is necessary. Cancellations will still result in an interrupt and behave as they presently do. The major difference is considering what the condition is going into a sleep state (nothing enqueued that hasn't had its predicate tested), and when it comes out of sleep (if it has been), what must then be done. Either exchanges have been enqueued or it is time to process exchanges. Even if exchanges have been enqueued then flow can still drop down into sending exchanges if the batch thresholds have been reached i.e. as before. I also make a point of unlocking the runLock prior to calling out to dependent code. Thoughts? -- View this message in context: http://www.nabble.com/BatchProcessor-interrupt-side-effects-tp22819960p22819960.html Sent from the Camel - Development (activemq) mailing list archive at Nabble.com.