[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51159#action_51159
 ] 

Martin Krasser commented on CAMEL-1510:
---------------------------------------

Hi Christopher,

agreed that it's a bit more elegant to use the locking mechanism from 
{{java.util.concurrent.locks}} when using Java 5 or higher :) I just tried to 
solve the problem using {{ReentrantLock}} and {{Condition}} too but instead of 
using an {{exchangeQueued}} variable I let the {{enqueueExchange()}} and the 
{{cancel()}} methods to _signal_ the batch sender to resume processing. I 
tested the following code with the {{AggregatorTest}} unit tests.

{code:java}
    private class BatchSender extends Thread {
        
        private volatile boolean cancelRequested;

        private Queue<Exchange> queue;
        
        private Lock queueMutex = new ReentrantLock();
        
        private Condition queueCondition = queueMutex.newCondition();

        public BatchSender() {
            super("Batch Sender");
            this.queue = new LinkedList<Exchange>();
        }

        @Override
        public void run() {
            while (true) {
                queueMutex.lock();
                try {
                    boolean signalled = queueCondition.await(batchTimeout, 
TimeUnit.MILLISECONDS);
                    processEnqueuedExchanges(signalled);
                } catch (InterruptedException e) {
                    break;
                } catch (Exception e) {
                    // TODO: handle exception ...
                    e.printStackTrace();
                } finally {
                    queueMutex.unlock();
                }
                
            }
        }
        
        public void cancel() {
            cancelRequested = true;
            queueMutex.lock();
            try {
                queueCondition.signal();
            } finally {
                queueMutex.unlock();
            }
        }
     
        public void enqueueExchange(Exchange exchange) {
            queue.add(exchange);
            queueMutex.lock();
            try {
                if (isInBatchCompleted(queue.size())) {
                    queueCondition.signal();
                }
            } finally {
                queueMutex.unlock();
            }
        }
        
        private void processEnqueuedExchanges(boolean signalled) throws 
Exception {

            if (!signalled) { 
                drainQueueTo(collection, batchSize); 
            } else {
                if (cancelRequested) {
                    return;
                }
                while (isInBatchCompleted(queue.size())) {
                    drainQueueTo(collection, batchSize);  
                }
                
                if (!isOutBatchCompleted()) {
                    return;
                }
            }
            
            
            try {
                sendExchanges();
            } catch (Exception e) {
                getExceptionHandler().handleException(e);
            }
            
        }
        
        private void sendExchanges() throws Exception {
           ...
        }
        
        private void drainQueueTo(Collection<Exchange> collection, int 
batchSize) {
           ...
        }

    }    
{code}

Does this make sense to you?

BTW similar changes should also be applied to the stream resequencer. Let's 
close this issue only when both the {{BatchProcessor}} and 
{{StreamResequencer}} are fixed.


> BatchProcessor interrupt has side effects
> -----------------------------------------
>
>                 Key: CAMEL-1510
>                 URL: https://issues.apache.org/activemq/browse/CAMEL-1510
>             Project: Apache Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 1.6.0, 2.0-M1
>         Environment: Mac OS X
>            Reporter: Christopher Hunt
>            Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
>     private class BatchSender extends Thread {
>         private Queue<Exchange> queue;
>         private boolean exchangeQueued = false;
>         private Lock queueMutex = new ReentrantLock();
>         private Condition queueCondition = queueMutex.newCondition();
>         public BatchSender() {
>             super("Batch Sender");
>             this.queue = new LinkedList<Exchange>();
>         }
>         public void cancel() {
>             interrupt();
>         }
>         private void drainQueueTo(Collection<Exchange> collection, int 
> batchSize) {
>             for (int i = 0; i < batchSize; ++i) {
>                 Exchange e = queue.poll();
>                 if (e != null) {
>                     collection.add(e);
>                 } else {
>                     break;
>                 }
>             }
>         }
>         public void enqueueExchange(Exchange exchange) {
>             queueMutex.lock();
>             try {
>                 queue.add(exchange);
>                 exchangeQueued = true;
>             } finally {
>                 queueMutex.unlock();
>             }
>         }
>         @Override
>         public void run() {
>             queueMutex.lock();
>             try {
>                 do {
>                     try {
>                         if (!exchangeQueued) {
>                             queueCondition.await(batchTimeout,
>                                     TimeUnit.MILLISECONDS);
>                             if (!exchangeQueued) {
>                                 drainQueueTo(collection, batchSize);
>                             }
>                         }
>                         if (exchangeQueued) {
>                             exchangeQueued = false;
>                             queueMutex.unlock();
>                             try {
>                                 while (isInBatchCompleted(queue.size())) {
>                                     queueMutex.lock();
>                                     try {
>                                         drainQueueTo(collection, batchSize);
>                                     } finally {
>                                         queueMutex.unlock();
>                                     }
>                                 }
>                                 if (!isOutBatchCompleted()) {
>                                     continue;
>                                 }
>                             } finally {
>                                 queueMutex.lock();
>                             }
>                         }
>                         queueMutex.unlock();
>                         try {
>                             try {
>                                 sendExchanges();
>                             } catch (Exception e) {
>                                 getExceptionHandler().handleException(e);
>                             }
>                         } finally {
>                             queueMutex.lock();
>                         }
>                     } catch (InterruptedException e) {
>                         break;
>                     }
>                 } while (true);
>             } finally {
>                 queueMutex.unlock();
>             }
>         }
>         private void sendExchanges() throws Exception {
>             Iterator<Exchange> iter = collection.iterator();
>             while (iter.hasNext()) {
>                 Exchange exchange = iter.next();
>                 iter.remove();
>                 processExchange(exchange);
>             }
>         }
>     }
> {code}
> I have replaced the concurrent queue with a regular linked list and mutexed 
> its access. In addition any queuing of exchanges is noted. This should result 
> in less locking.
> The main change though is that queuing an exchange does not interrupt the 
> batch sender's current activity.
> I hope that this sample is useful.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to