[ 
https://issues.apache.org/jira/browse/CAMEL-12668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16567902#comment-16567902
 ] 

Dmitry Volodin commented on CAMEL-12668:
----------------------------------------

I've done a research and detects that consumer with streaming capabilities has 
a wrong shutdown life-cycle if it doesn't implements 
org.apache.camel.Suspendable interface, i.e. RejectableThreadPoolExecutor 
shutdown before AggregateProcessor complete.

2018-08-02 10:37:46,038 [lication.main()] INFO  CamelApplication               
- Starting Context shut down
2018-08-02 10:37:46,039 [lication.main()] INFO  DefaultCamelContext            
- Apache Camel 2.17.0 (CamelContext: camel-1) is shutting down
2018-08-02 10:37:46,040 [lication.main()] INFO  DefaultShutdownStrategy        
- Starting to graceful shutdown 1 routes (timeout 300 seconds)
2018-08-02 10:37:46,041 [lication.main()] DEBUG DefaultExecutorServiceManager  
- Created new ThreadPool for source: 
org.apache.camel.impl.DefaultShutdownStrategy@4844cb37 with name: ShutdownTask. 
-> 
org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@7be90f7b[Running, 
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 
0][ShutdownTask]
2018-08-02 10:37:46,048 [ - ShutdownTask] DEBUG DefaultShutdownStrategy        
- There are 1 routes to shutdown
2018-08-02 10:37:46,049 [ - ShutdownTask] DEBUG DefaultExecutorServiceManager  
- Forcing shutdown of ExecutorService: 
org.apache.camel.util.concurrent.RejectableThreadPoolExecutor@3598612a[Running, 
pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 
1][stream://file?fileName=src%2Fmain%2Fresources%2Ftest.txt]
2018-08-02 10:37:46,051 [ - ShutdownTask] DEBUG DefaultManagementAgent         
- Unregistered MBean with ObjectName: 
org.apache.camel:context=camel-1,type=threadpools,name="StreamConsumer(0x19584fb0)"
2018-08-02 10:37:46,052 [ - ShutdownTask] DEBUG StreamConsumer                 
- Stopping consumer: 
Consumer[stream://file?fileName=src%2Fmain%2Fresources%2Ftest.txt]
2018-08-02 10:37:46,053 [ - ShutdownTask] INFO  DefaultShutdownStrategy        
- Route: route1 shutdown complete, was consuming from: 
Endpoint[stream://file?fileName=src%2Fmain%2Fresources%2Ftest.txt]
2018-08-02 10:37:46,054 [ - ShutdownTask] DEBUG AggregateProcessor             
- Aggregation complete for correlation key true sending aggregated exchange: 
Exchange[]
2018-08-02 10:37:46,057 [ - ShutdownTask] DEBUG AggregateProcessor             
- Processing aggregated exchange: 
Exchange[ID-dvolodin-redhat-local-34899-1533195455487-0-3]
2018-08-02 10:37:46,063 [ - ShutdownTask] WARN  AggregateProcessor             
- Error processing aggregated exchange. 
Exchange[ID-dvolodin-redhat-local-34899-1533195455487-0-3]. Caused by: 
[java.util.concurrent.RejectedExecutionException - null]

For example, for camel-stream component add Suspendable marker implementation 
solves the problem. But the problem can be more common for other similar 
components. [~davsclaus], [~ancosen] what do you think about potential fix: 
check for components with streaming capabilities and add Suspendable marker 
implementation or continue investigate the core problem related to CAMEL-9577 
fix?

 

 

> Aggregate with forcecompleteonstop doesn't seem to be honored
> -------------------------------------------------------------
>
>                 Key: CAMEL-12668
>                 URL: https://issues.apache.org/jira/browse/CAMEL-12668
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.17.0
>            Reporter: Chirag Anand
>            Assignee: Dmitry Volodin
>            Priority: Major
>             Fix For: 2.22.1
>
>
> I have a camel project where I am using twitter component of the camel and 
> using the streaming/filter endpoint. The route is something like 
> from(<twitterstreamendpoint>).aggregate(<customstratergy>).completionSize().forceCompletionOnStop().process().
>  The problem here is when I stop the context the remaining aggregated 
> exchanges are not being flushed to the processor. Earlier I was using version 
> 2.16.2 and it worked fine, but with the latest release i.e 2.22.0 I am facing 
> this issue. The configuration hasn’t changed a bit. To be certain i tried it 
> with version 2.17.0 also, doesn't work either.
>  
> I tried debugging and I noticed at somepoint there is a 
> RejectedExecutionException, not sure why because there is no change in the 
> configuration.
>  
> Can you please resolve this at the earliest? or at least tell me if there has 
> been any change in the functionality/usage?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to