[
https://issues.apache.org/jira/browse/CAMEL-4925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13190435#comment-13190435
]
Claus Ibsen commented on CAMEL-4925:
------------------------------------
After looking into this for a bit, then the JDK does *not* offer good APIs for
being able to do custom logic when rejected, by which you get access to the
inner details of the runnable task you submitted to the thread pool. That means
for DiscardOldest, you do *not* know which Exchange is to be discarded, as you
cannot get access to the Exchange. The JDK ThreadPoolExecutor will wrap the
runnable/FutureTask using an adapter, which does not expose API for you to get
access to the Exchange.
Even if you create a custom FutureTask and submit that, then it's the adapter
you get when the task is rejected. And the adapter does not allow to get you to
your custom FutureTask.
So the best we can do is to deny supporting DiscardOldest, and then we can
handle Abort and Discard in the ThreadsProcessor, where we can mark the
exchange to stop routing, so the exchange will be done, which means that it
will be unregistered from the inflight registry and whatnot.
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or
> DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: CAMEL-4925
> URL: https://issues.apache.org/jira/browse/CAMEL-4925
> Project: Camel
> Issue Type: Bug
> Components: camel-core
> Affects Versions: 2.8.0
> Reporter: Sergey Zhemzhitsky
> Assignee: Claus Ibsen
> Priority: Minor
> Fix For: 2.10.0
>
> Attachments: CamelRoutingTest.java
>
>
> ThreadsProcessor configured with ExecutorService with DiscardPolicy or
> DiscardOldestPolicy leaves inflight exchanges for discarded tasks unprocessed.
> Here is the code from ThreadsProcessor. In case of DiscardPolicy or
> DiscardOldestPolicy executorService will no throw RejectedExecutionException,
> so exchange remains unprocessed and count of inflight exchanges will not be
> decremented for such discarded exchanges.
> {code:java|title=ThreadsProcessor#process(Exchange, AsyncCallback)}
> public boolean process(Exchange exchange, AsyncCallback callback) {
> if (shutdown.get()) {
> throw new IllegalStateException("ThreadsProcessor is not running.");
> }
> ProcessCall call = new ProcessCall(exchange, callback);
> try {
> executorService.submit(call);
> // tell Camel routing engine we continue routing asynchronous
> return false;
> } catch (RejectedExecutionException e) {
> if (isCallerRunsWhenRejected()) {
> if (shutdown.get()) {
> exchange.setException(new RejectedExecutionException());
> } else {
> callback.done(true);
> }
> } else {
> exchange.setException(e);
> }
> return true;
> }
> }
> {code}
> Unit test is attached.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira