[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51159#action_51159 ]
Martin Krasser commented on CAMEL-1510: --------------------------------------- Hi Christopher, agreed that it's a bit more elegant to use the locking mechanism from {{java.util.concurrent.locks}} when using Java 5 or higher :) I just tried to solve the problem using {{ReentrantLock}} and {{Condition}} too but instead of using an {{exchangeQueued}} variable I let the {{enqueueExchange()}} and the {{cancel()}} methods to _signal_ the batch sender to resume processing. I tested the following code with the {{AggregatorTest}} unit tests. {code:java} private class BatchSender extends Thread { private volatile boolean cancelRequested; private Queue<Exchange> queue; private Lock queueMutex = new ReentrantLock(); private Condition queueCondition = queueMutex.newCondition(); public BatchSender() { super("Batch Sender"); this.queue = new LinkedList<Exchange>(); } @Override public void run() { while (true) { queueMutex.lock(); try { boolean signalled = queueCondition.await(batchTimeout, TimeUnit.MILLISECONDS); processEnqueuedExchanges(signalled); } catch (InterruptedException e) { break; } catch (Exception e) { // TODO: handle exception ... e.printStackTrace(); } finally { queueMutex.unlock(); } } } public void cancel() { cancelRequested = true; queueMutex.lock(); try { queueCondition.signal(); } finally { queueMutex.unlock(); } } public void enqueueExchange(Exchange exchange) { queue.add(exchange); queueMutex.lock(); try { if (isInBatchCompleted(queue.size())) { queueCondition.signal(); } } finally { queueMutex.unlock(); } } private void processEnqueuedExchanges(boolean signalled) throws Exception { if (!signalled) { drainQueueTo(collection, batchSize); } else { if (cancelRequested) { return; } while (isInBatchCompleted(queue.size())) { drainQueueTo(collection, batchSize); } if (!isOutBatchCompleted()) { return; } } try { sendExchanges(); } catch (Exception e) { getExceptionHandler().handleException(e); } } private void sendExchanges() throws Exception { ... } private void drainQueueTo(Collection<Exchange> collection, int batchSize) { ... } } {code} Does this make sense to you? BTW similar changes should also be applied to the stream resequencer. Let's close this issue only when both the {{BatchProcessor}} and {{StreamResequencer}} are fixed. > BatchProcessor interrupt has side effects > ----------------------------------------- > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core > Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X > Reporter: Christopher Hunt > Priority: Critical > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue<Exchange> queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList<Exchange>(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection<Exchange> collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } > if (!isOutBatchCompleted()) { > continue; > } > } finally { > queueMutex.lock(); > } > } > queueMutex.unlock(); > try { > try { > sendExchanges(); > } catch (Exception e) { > getExceptionHandler().handleException(e); > } > } finally { > queueMutex.lock(); > } > } catch (InterruptedException e) { > break; > } > } while (true); > } finally { > queueMutex.unlock(); > } > } > private void sendExchanges() throws Exception { > Iterator<Exchange> iter = collection.iterator(); > while (iter.hasNext()) { > Exchange exchange = iter.next(); > iter.remove(); > processExchange(exchange); > } > } > } > {code} > I have replaced the concurrent queue with a regular linked list and mutexed > its access. In addition any queuing of exchanges is noted. This should result > in less locking. > The main change though is that queuing an exchange does not interrupt the > batch sender's current activity. > I hope that this sample is useful. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.