[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51163#action_51163 ]
Christopher Hunt commented on CAMEL-1510: ----------------------------------------- "my intention was to provide an implementation that signals the batch sender to stop waiting when the batch size has been reached and to continue processing. I think we should keep that. " I see the problem here - I forgot to include a signal of the condition variable. I will update my original comment to reflect this. My sincere apologies for the confusion. Would you mind re-reviewing my code? "Maybe we should also consider to have a shared implementation for the wait/signal/cancel mechanisms for the BatchProcessor and the StreamResequencer, otherwise, we'd need to implement similar things in two different places. " I agree, a shared batch sender style of class should be useful. "Do you want to provide a patch file plus some tests or should we wait for comments from one of the commiters how to proceed?" I'm happy to provide a patch file, though I did have difficulty building the camel distro. I could try again. I think that it would be great to receive some more feedback on this incident. Thanks again for the dialogue. > BatchProcessor interrupt has side effects > ----------------------------------------- > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core > Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X > Reporter: Christopher Hunt > Priority: Critical > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue<Exchange> queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList<Exchange>(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection<Exchange> collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } > if (!isOutBatchCompleted()) { > continue; > } > } finally { > queueMutex.lock(); > } > } > queueMutex.unlock(); > try { > try { > sendExchanges(); > } catch (Exception e) { > getExceptionHandler().handleException(e); > } > } finally { > queueMutex.lock(); > } > } catch (InterruptedException e) { > break; > } > } while (true); > } finally { > queueMutex.unlock(); > } > } > private void sendExchanges() throws Exception { > Iterator<Exchange> iter = collection.iterator(); > while (iter.hasNext()) { > Exchange exchange = iter.next(); > iter.remove(); > processExchange(exchange); > } > } > } > {code} > I have replaced the concurrent queue with a regular linked list and mutexed > its access. In addition any queuing of exchanges is noted. This should result > in less locking. > The main change though is that queuing an exchange does not interrupt the > batch sender's current activity. > I hope that this sample is useful. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.