James Xu created STORM-98:
-----------------------------
Summary: .stateQuery twice halts tuple execution?
Key: STORM-98
URL: https://issues.apache.org/jira/browse/STORM-98
Project: Apache Storm (Incubating)
Issue Type: Improvement
Reporter: James Xu
Priority: Minor
https://github.com/nathanmarz/storm/issues/310
Having the following example, it will never execute the .aggregate()
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("cow"),
new Values("candy"),
new Values("year"));
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
TridentState urlToTweeters =
topology.newStaticState(
new
TridentReach.StaticSingleKeyMapState.Factory(TridentReach.TWEETERS_DB));
Stream wordStream = topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.stateQuery(urlToTweeters, new Fields("word"), new MapGet(), new
Fields("output1"))
.groupBy(new Fields("word"))
.stateQuery(urlToTweeters, new Fields("word"), new MapGet(), new
Fields("output2"))
.aggregate(new Fields("word"), new PrintAggregator(), new
Fields("count"));
PrintAggregator:
public static class PrintAggregator extends
BaseAggregator<PrintAggregator.State> {
static class State {
int counter = 0;
}
@Override
public State init(Object o, TridentCollector collector) {
return new State();
}
@Override
public void aggregate(State state, TridentTuple tuple, TridentCollector
collector) {
state.counter++;
System.out.println(tuple.getString(0) + " is on: " + state.counter);
}
@Override
public void complete(State state, TridentCollector collector) {
collector.emit(new Values(state.counter));
}
}
----------
nathanmarz: Trident currently doesn't support recursive topologies. In this
case you have the output of a state feeding back into a query on the same
state. You can workaround this by making two separate static state instances
for urlToTweeters.
--
This message was sent by Atlassian JIRA
(v6.1.4#6159)