[ 
https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13190471#comment-13190471
 ] 

Sergey Zhemzhitsky commented on CAMEL-4925:
-------------------------------------------

Hi Claus,
I noticed the same things you mentioned and attached a sample patch that can 
possibly be used to support correct rejection of submitted tasks. The idea is 
to provide our own _ThreadPoolExecutor_ and _ScheduledThreadPoolExecutor_ from 
_DefaultThreadPoolFactory_. These pools wrap submitted tasks with custom 
_FutureTask_ that supports rejection. _RejectedExecutionHandlers_ in the 
_ThreadPoolRejectedPolicy_ are also changed to check whether the discarded 
tasks can be rejected (I also added blocking policy into the 
_ThreadPoolRejectedPolicy_).

So if submitted tasks do not implement _Rejectable_ interface the behavior is 
exactly the same as with ordinary _ThreadPoolExecutor_ and 
_ScheduledThreadPoolExecutor_. If these tasks are _Rejectable_, their 
_reject()_ method will be called. 

The only problem is what to do if the user provides its own ExecutorService to 
configure the ThreadsProcessor. I suppose in that case this user should be 
fully responsible for handling rejections correctly (it should be mentions in 
camel docs or somewhere else).
                
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or 
> DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-4925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.8.0
>            Reporter: Sergey Zhemzhitsky
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 2.10.0
>
>         Attachments: CAMEL-4925.patch, CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or 
> DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or 
> DiscardOldestPolicy executorService will no throw RejectedExecutionException, 
> so exchange remains unprocessed and count of inflight exchanges will not be 
> decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
>     if (shutdown.get()) {
>         throw new IllegalStateException("ThreadsProcessor is not running.");
>     }
>     ProcessCall call = new ProcessCall(exchange, callback);
>     try {
>         executorService.submit(call);
>         // tell Camel routing engine we continue routing asynchronous
>         return false;
>     } catch (RejectedExecutionException e) {
>         if (isCallerRunsWhenRejected()) {
>             if (shutdown.get()) {
>                 exchange.setException(new RejectedExecutionException());
>             } else {
>                 callback.done(true);
>             }
>         } else {
>             exchange.setException(e);
>         }
>         return true;
>     }
> }
> {code}
> Unit test is attached.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to