[ 
https://issues.apache.org/jira/browse/STORM-2025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Majid Hajibaba updated STORM-2025:
----------------------------------
    Description: 
when i use withTumblingWindow and process the input messages, if the processing 
time is longer than input rate, we will not get all input messages.

{code}
int count=0;
        @Override
        public void execute(TupleWindow inputWindow) {
                try {
                        List<Event> windowEvenets = new ArrayList<>();
                        for(Tuple tuple: inputWindow.get()) {
                                if 
(tuple.getSourceComponent().equals("KafkaSpout")) {
                                        count++;
                                        
windowEvenets.add(ScenarioUtils.convertToKlugEvent(tuple.getString(0)));
                                }
                        }
                        logger.info(count + "======= Process event ");
                        Thread.sleep(4000);
                }
                catch (Exception ex) {
                        ex.printStackTrace();
                }
        }
{code}

The topology is as follow:
{code}
 TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
            builder.setBolt("WindowInputTest", new 
WindowInputTest(zookeeperHosts).withTumblingWindow(new 
BaseWindowedBolt.Duration(4,TimeUnit.SECONDS)), 
1).shuffleGrouping("KafkaSpout");
{code}

  was:
when i use withTumblingWindow and process the input messages, if the processing 
time is longer than input rate, we will not get all input messages.

int count=0;
        @Override
        public void execute(TupleWindow inputWindow) {
                try {
                        List<Event> windowEvenets = new ArrayList<>();
                        for(Tuple tuple: inputWindow.get()) {
                                if 
(tuple.getSourceComponent().equals("KafkaSpout")) {
                                        count++;
                                        
windowEvenets.add(ScenarioUtils.convertToKlugEvent(tuple.getString(0)));
                                }
                        }
                        logger.info(count + "======= Process event ");
                        Thread.sleep(4000);
                }
                catch (Exception ex) {
                        ex.printStackTrace();
                }
        }



 TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
            builder.setBolt("WindowInputTest", new 
WindowInputTest(zookeeperHosts).withTumblingWindow(new 
BaseWindowedBolt.Duration(4,TimeUnit.SECONDS)), 
1).shuffleGrouping("KafkaSpout");



> dropping messages in withTumblingWindow
> ---------------------------------------
>
>                 Key: STORM-2025
>                 URL: https://issues.apache.org/jira/browse/STORM-2025
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-core
>    Affects Versions: 1.0.1
>         Environment: ubuntu 14.0.1 LTS
>            Reporter: Majid Hajibaba
>
> when i use withTumblingWindow and process the input messages, if the 
> processing time is longer than input rate, we will not get all input messages.
> {code}
> int count=0;
>       @Override
>       public void execute(TupleWindow inputWindow) {
>               try {
>                       List<Event> windowEvenets = new ArrayList<>();
>                       for(Tuple tuple: inputWindow.get()) {
>                               if 
> (tuple.getSourceComponent().equals("KafkaSpout")) {
>                                       count++;
>                                       
> windowEvenets.add(ScenarioUtils.convertToKlugEvent(tuple.getString(0)));
>                               }
>                       }
>                       logger.info(count + "======= Process event ");
>                       Thread.sleep(4000);
>               }
>               catch (Exception ex) {
>                       ex.printStackTrace();
>               }
>       }
> {code}
> The topology is as follow:
> {code}
>  TopologyBuilder builder = new TopologyBuilder();
>             builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
>             builder.setBolt("WindowInputTest", new 
> WindowInputTest(zookeeperHosts).withTumblingWindow(new 
> BaseWindowedBolt.Duration(4,TimeUnit.SECONDS)), 
> 1).shuffleGrouping("KafkaSpout");
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to