Majid Hajibaba created STORM-2025:
-------------------------------------
Summary: dropping messages in withTumblingWindow
Key: STORM-2025
URL: https://issues.apache.org/jira/browse/STORM-2025
Project: Apache Storm
Issue Type: Bug
Components: storm-core
Affects Versions: 1.0.1
Environment: ubuntu 14.0.1 LTS
Reporter: Majid Hajibaba
when i use withTumblingWindow and process the input messages, if the processing
time is longer than input rate, we will not get all input messages.
int count=0;
@Override
public void execute(TupleWindow inputWindow) {
try {
List<Event> windowEvenets = new ArrayList<>();
for(Tuple tuple: inputWindow.get()) {
if
(tuple.getSourceComponent().equals("KafkaSpout")) {
count++;
windowEvenets.add(ScenarioUtils.convertToKlugEvent(tuple.getString(0)));
}
}
logger.info(count + "======= Process event ");
Thread.sleep(4000);
}
catch (Exception ex) {
ex.printStackTrace();
}
}
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1
/*kafkaSpoutCount*/);
//builder.setSpout("KafkaSpout", new MyKafkaSpout("correlateTest"),
1);
builder.setBolt("WindowInputTest", new
WindowInputTest(zookeeperHosts).withTumblingWindow(new
BaseWindowedBolt.Duration(4,TimeUnit.SECONDS)),
1).shuffleGrouping("KafkaSpout");
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)