[ 
https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13190435#comment-13190435
 ] 

Claus Ibsen commented on CAMEL-4925:
------------------------------------

After looking into this for a bit, then the JDK does *not* offer good APIs for 
being able to do custom logic when rejected, by which you get access to the 
inner details of the runnable task you submitted to the thread pool. That means 
for DiscardOldest, you do *not* know which Exchange is to be discarded, as you 
cannot get access to the Exchange. The JDK ThreadPoolExecutor will wrap the 
runnable/FutureTask using an adapter, which does not expose API for you to get 
access to the Exchange.

Even if you create a custom FutureTask and submit that, then it's the adapter 
you get when the task is rejected. And the adapter does not allow to get you to 
your custom FutureTask.

So the best we can do is to deny supporting DiscardOldest, and then we can 
handle Abort and Discard in the ThreadsProcessor, where we can mark the 
exchange to stop routing, so the exchange will be done, which means that it 
will be unregistered from the inflight registry and whatnot.
                
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or 
> DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 2.10.0
>
>         Attachments: CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or 
> DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or 
> DiscardOldestPolicy executorService will no throw RejectedExecutionException, 
> so exchange remains unprocessed and count of inflight exchanges will not be 
> decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to