[
https://issues.apache.org/jira/browse/CAMEL-12668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554012#comment-16554012
]
Chirag Anand commented on CAMEL-12668:
--------------------------------------
Hi [~dmvolod], Thank you for responding. I tried the same thing using file
component and it seemed to work fine but doesn't work when using the twitter
component. I am testing this with Camel version 2.17.0.
The route that i am using for twitter component is as such -
public class CamelRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("twitter://streaming/filter?accessToken=xxx&accessTokenSecret=xxx&consumerKey=xxx&consumerSecret=xxx&keywords=XYZ&type=event")
.aggregate(constant(true), new
CamelAggregator()).completionSize(50)
.forceCompletionOnStop().process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
List<Object> statusObjects =
exchange.getIn().getBody(List.class);
System.out.println(statusObjects.size());
}
}).end();
}
}
The custom aggregation logic is as such -
public class CamelAggregator implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Object newBody = newExchange.getIn().getBody(Object.class);
List<Object> list = null;
if (oldExchange == null) {
list = new ArrayList<>();
list.add(newBody);
newExchange.getIn().setBody(list);
return newExchange;
} else {
list = oldExchange.getIn().getBody(ArrayList.class);
list.add(newBody);
return oldExchange;
}
}
}
The route which succeeded for the file component is as such -
public class CamelFileRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("file://C:/Test")
.aggregate(constant(true), new CamelAggregator()).completionSize(50)
.forceCompletionOnStop().process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
List<Object> statusObjects =
exchange.getIn().getBody(List.class);
System.out.println(statusObjects.size());
}
}).end();
}
}
Please tell me if i am making any mistake here or if there is any problem with
the twitter component only?
> Aggregate with forcecompleteonstop doesn't seem to be honored
> -------------------------------------------------------------
>
> Key: CAMEL-12668
> URL: https://issues.apache.org/jira/browse/CAMEL-12668
> Project: Camel
> Issue Type: Bug
> Components: camel-core
> Affects Versions: 2.17.0
> Reporter: Chirag Anand
> Priority: Major
> Fix For: 2.22.0
>
>
> I have a camel project where I am using twitter component of the camel and
> using the streaming/filter endpoint. The route is something like
> from(<twitterstreamendpoint>).aggregate(<customstratergy>).completionSize().forceCompletionOnStop().process().
> The problem here is when I stop the context the remaining aggregated
> exchanges are not being flushed to the processor. Earlier I was using version
> 2.16.2 and it worked fine, but with the latest release i.e 2.22.0 I am facing
> this issue. The configuration hasn’t changed a bit. To be certain i tried it
> with version 2.17.0 also, doesn't work either.
>
> I tried debugging and I noticed at somepoint there is a
> RejectedExecutionException, not sure why because there is no change in the
> configuration.
>
> Can you please resolve this at the earliest? or at least tell me if there has
> been any change in the functionality/usage?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)