[ 
https://issues.apache.org/jira/browse/CAMEL-12668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554012#comment-16554012
 ] 

Chirag Anand commented on CAMEL-12668:
--------------------------------------

Hi [~dmvolod], Thank you for responding. I tried the same thing using file 
component and it seemed to work fine but doesn't work when using the twitter 
component. I am testing this with Camel version 2.17.0.

 

The route that i am using for twitter component is as such -

public class CamelRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        
from("twitter://streaming/filter?accessToken=xxx&accessTokenSecret=xxx&consumerKey=xxx&consumerSecret=xxx&keywords=XYZ&type=event")
                .aggregate(constant(true), new 
CamelAggregator()).completionSize(50)
                .forceCompletionOnStop().process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        List<Object> statusObjects = 
exchange.getIn().getBody(List.class);
                        System.out.println(statusObjects.size());
                    }
                }).end();
    }

}

 

The custom aggregation logic is as such -

public class CamelAggregator implements AggregationStrategy {

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        Object newBody = newExchange.getIn().getBody(Object.class);
        List<Object> list = null;

        if (oldExchange == null) {
            list = new ArrayList<>();
            list.add(newBody);
            newExchange.getIn().setBody(list);
            return newExchange;
        } else {
            list = oldExchange.getIn().getBody(ArrayList.class);
            list.add(newBody);
            return oldExchange;
        }
    }
}

 

The route which succeeded for the file component is as such -

public class CamelFileRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("file://C:/Test")
        .aggregate(constant(true), new CamelAggregator()).completionSize(50)
        .forceCompletionOnStop().process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                List<Object> statusObjects = 
exchange.getIn().getBody(List.class);
                System.out.println(statusObjects.size());
            }
        }).end();
    }

}

 

Please tell me if i am making any mistake here or if there is any problem with 
the twitter component only?

> Aggregate with forcecompleteonstop doesn't seem to be honored
> -------------------------------------------------------------
>
>                 Key: CAMEL-12668
>                 URL: https://issues.apache.org/jira/browse/CAMEL-12668
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.17.0
>            Reporter: Chirag Anand
>            Priority: Major
>             Fix For: 2.22.0
>
>
> I have a camel project where I am using twitter component of the camel and 
> using the streaming/filter endpoint. The route is something like 
> from(<twitterstreamendpoint>).aggregate(<customstratergy>).completionSize().forceCompletionOnStop().process().
>  The problem here is when I stop the context the remaining aggregated 
> exchanges are not being flushed to the processor. Earlier I was using version 
> 2.16.2 and it worked fine, but with the latest release i.e 2.22.0 I am facing 
> this issue. The configuration hasn’t changed a bit. To be certain i tried it 
> with version 2.17.0 also, doesn't work either.
>  
> I tried debugging and I noticed at somepoint there is a 
> RejectedExecutionException, not sure why because there is no change in the 
> configuration.
>  
> Can you please resolve this at the earliest? or at least tell me if there has 
> been any change in the functionality/usage?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to