Hi Can you log a JIRA, we need to take a look at that shutdown so the thread pools are stopped after its done its work.
On Wed, Apr 26, 2017 at 6:18 PM, blove319 <[email protected]> wrote: > Camel versions tested: 2.16 - 2.18.3 > Current Maven dependencies: > org.apache.camel:camel-test 2.18.1 > org.apache.camel:camel-sjms 2.17.3 > > SHORT VERSION: > When using SJMS (et al?) with an aggregator and/or splitter in the route, > shutting down either throws an error or tosses out messages. > > > LONG VERSION: > > When using an SJMS consumer to consume from a queue, with a route that has > an aggregator in it, I inevitably lose messages when the route stops. > > The two obvious documented aggregator modifiers do not work: > > *forceCompletionOnStop* - results in a RejectedExecutionException error > because the underlying thread pools are stopped/closed before the > "prepareShutdown" method is called on the aggregator (which is when the > outstanding aggregations are forced to complete and the results are handed > to the route for processing). > > *completeAllOnStop* - results in the route logging the number of outstanding > messages every second (the number never changes) until the (500 second?) > timeout is reached, at which point the route is forced to shut down and the > messages are tossed out. Presumably because there is no active thread pool > available to handle the messages. > > Without either of these two modifiers on the aggregator, it just tosses out > any unfinished aggregations on shutdown. > > Here's a sample test... It probably isn't ideally written, but it does > illustrate the issue... > > > import org.apache.activemq.junit.EmbeddedActiveMQBroker; > import org.apache.camel.CamelContext; > import org.apache.camel.Exchange; > import org.apache.camel.Processor; > import org.apache.camel.ProducerTemplate; > import org.apache.camel.builder.RouteBuilder; > import org.apache.camel.component.sjms.SjmsComponent; > import org.apache.camel.impl.DefaultCamelContext; > import org.apache.camel.util.toolbox.AggregationStrategies; > import org.junit.Rule; > import org.junit.Test; > > /** > * Created by bryan.love on 4/25/17. > */ > public class SjmsBatchTest { > @Rule > public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker(); > CamelContext context = new DefaultCamelContext(); > ProducerTemplate template = context.createProducerTemplate(); > > @Test > public void testBatch() throws Exception { > SjmsComponent comp = new SjmsComponent(); > comp.setConnectionFactory(broker.createConnectionFactory()); > context.addComponent("sjms", comp); > //context.setShutdownStrategy(new MyShutdownStrategy(context)); > > RouteBuilder rb = new RouteBuilder() { > @Override > public void configure() throws Exception { > from("sjms:queue:test-in") > .aggregate(header("CamelFileName"), > AggregationStrategies.groupedExchange()) > .id("fileNameAggProcessor") > .completionInterval(10000) // wait $b > .completionSize(50) // wait for $batchSize > messages to aggregate > .forceCompletionOnStop() > .filter(header("CamelFileName").isNotNull()) > .process(new Processor() { > @Override > public void process(Exchange exchange) throws Exception > { > System.out.println("foo"); > } > }); > } > }; > context.addRoutes(rb); > > context.start(); > template.setDefaultEndpointUri("sjms:queue:test-in"); > template.sendBodyAndHeader("some body", "CamelFileName", > "someFileName"); > Thread.sleep(1000); > context.stop(); > } > } > > > > > -- > View this message in context: > http://camel.465427.n5.nabble.com/SJMS-RejectedExecutionException-bug-during-shutdown-tp5798335.html > Sent from the Camel Development mailing list archive at Nabble.com. -- Claus Ibsen ----------------- http://davsclaus.com @davsclaus Camel in Action 2: https://www.manning.com/ibsen2
