Nikos Katsipoulakis created STORM-3611:
------------------------------------------
Summary: IllegalArgumentException thrown on windowed bolt
Key: STORM-3611
URL: https://issues.apache.org/jira/browse/STORM-3611
Project: Apache Storm
Issue Type: Bug
Components: storm-client
Affects Versions: 2.1.0
Environment: Ubuntu Linux 16.04, OpenJDK version 1.8.0_242
Reporter: Nikos Katsipoulakis
Fix For: 2.1.0
The following topology with a single `BasicWindowedBolt`. The following
topology:
{code:java}
public class GroupMean extends ConfigurableTopology {
public static void main(String[] args) throws Exception {
ConfigurableTopology.start(new GroupMean(), args);
}
@Override
protected int run(String[] args) throws Exception {
BaseWindowedBolt bolt = new GroupMeanBolt()
.withTimestampExtractor(x -> x.getLong(0))
.withTumblingWindow(BaseWindowedBolt.Duration.of(100));
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("groups", new RandomGroupNumberSpout(), 1);
builder.setBolt("mean", bolt, 1).shuffleGrouping("groups");
Config config = new Config();
config.put("topology.name", "group-mean-topo");
return submit("group-mean-topo", config, builder);
}
}
{code}
with the GroupMeanBolt being like:
{code:java}
public class GroupMeanBolt extends BaseWindowedBolt {
private OutputCollector collector;
@Override
public void prepare(Map<String, Object> conf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(TupleWindow inputWindow) {
if (inputWindow.get().size() > 0) {
Map<Integer, Double> sum = new HashMap<>();
Map<Integer, Integer> histogram = new HashMap<>();
for (Tuple t : inputWindow.get()) {
Integer key = t.getIntegerByField("key");
Double value = t.getDoubleByField("value");
sum.compute(key, (k, v) -> v == null ? value : v + value);
histogram.compute(key, (k, v) -> v == null ? 1 : v + 1);
}
Map<Integer, Number> result = new HashMap<>(sum.size());
for (Map.Entry<Integer, Double> e : sum.entrySet()) {
result.put(e.getKey(), e.getValue() / histogram.get(e.getKey()));
}
ArrayList<Object> out = new ArrayList<>(3);
out.add(inputWindow.getStartTimestamp());
out.add(inputWindow.getEndTimestamp());
ArrayList<String> tokens = new ArrayList<>(result.size());
for (Map.Entry<Integer, Number> e : result.entrySet()) {
String token = String.join(":", e.getKey().toString(),
e.getValue().toString());
tokens.add(token);
}
out.add(String.join(",", tokens));
collector.emit(out);
}
}
}
{code}
At runtime, an IllegalArgumentException is thrown because the "default" stream
is not recognized. Stack trace:
{code:java}
2020-03-27 15:42:42.705 o.a.s.w.WaterMarkEventGenerator
watermark-event-generator-0 [ERROR] Failed while processing watermark event
java.lang.IllegalArgumentException: Unknown stream ID: default
at org.apache.storm.daemon.Task.getOutgoingTasks(Task.java:164)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:88)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.topology.WindowedBoltExecutor$WindowedOutputCollector.emit(WindowedBoltExecutor.java:403)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:88)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at gr.katsip.spear.GroupMeanBolt.execute(GroupMeanBolt.java:51)
~[stormjar.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.topology.WindowedBoltExecutor.boltExecute(WindowedBoltExecutor.java:370)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.topology.WindowedBoltExecutor$1.onActivation(WindowedBoltExecutor.java:363)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.windowing.WindowManager.onTrigger(WindowManager.java:156)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.windowing.WatermarkTimeTriggerPolicy.handleWaterMarkEvent(WatermarkTimeTriggerPolicy.java:73)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.windowing.WatermarkTimeTriggerPolicy.track(WatermarkTimeTriggerPolicy.java:43)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.windowing.WindowManager.track(WindowManager.java:185)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at org.apache.storm.windowing.WindowManager.add(WindowManager.java:121)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.windowing.WaterMarkEventGenerator.run(WaterMarkEventGenerator.java:88)
[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_242]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
[?:1.8.0_242]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_242]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[?:1.8.0_242]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_242]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_242]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
2020-03-27 15:42:42.715 o.a.s.w.WaterMarkEventGenerator
Thread-14-mean-executor[3, 3] [ERROR] Got exception
java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException:
Unknown stream ID: default
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
~[?:1.8.0_242]
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
~[?:1.8.0_242]
at
org.apache.storm.windowing.WaterMarkEventGenerator.checkFailures(WaterMarkEventGenerator.java:115)
[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.windowing.WaterMarkEventGenerator.track(WaterMarkEventGenerator.java:79)
[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:308)
[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:234)
[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at org.apache.storm.executor.Executor.accept(Executor.java:275)
[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:131)
[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at org.apache.storm.utils.JCQueue.consume(JCQueue.java:111)
[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:171)
[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:158)
[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at org.apache.storm.utils.Utils$1.run(Utils.java:392)
[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
Caused by: java.lang.IllegalArgumentException: Unknown stream ID: default
at org.apache.storm.daemon.Task.getOutgoingTasks(Task.java:164)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:88)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.topology.WindowedBoltExecutor$WindowedOutputCollector.emit(WindowedBoltExecutor.java:403)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:88)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at gr.katsip.spear.GroupMeanBolt.execute(GroupMeanBolt.java:51)
~[stormjar.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.topology.WindowedBoltExecutor.boltExecute(WindowedBoltExecutor.java:370)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.topology.WindowedBoltExecutor$1.onActivation(WindowedBoltExecutor.java:363)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.windowing.WindowManager.onTrigger(WindowManager.java:156)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.windowing.WatermarkTimeTriggerPolicy.handleWaterMarkEvent(WatermarkTimeTriggerPolicy.java:73)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.windowing.WatermarkTimeTriggerPolicy.track(WatermarkTimeTriggerPolicy.java:43)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.windowing.WindowManager.track(WindowManager.java:185)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at org.apache.storm.windowing.WindowManager.add(WindowManager.java:121)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
org.apache.storm.windowing.WaterMarkEventGenerator.run(WaterMarkEventGenerator.java:88)
~[storm-client-2.1.1-SNAPSHOT.jar:2.1.1-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_242]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
~[?:1.8.0_242]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
~[?:1.8.0_242]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
~[?:1.8.0_242]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_242]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_242]
... 1 more
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)