[
https://issues.apache.org/jira/browse/STORM-3598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ethan Li updated STORM-3598:
----------------------------
Description:
We encountered an issue with visualization on UI.
{code:java}
2020-03-09 19:59:01.756 o.a.s.d.u.r.StormApiResource qtp1919834117-167291
[ERROR] Failure getting topology visualization
java.lang.NullPointerException: null
at
org.apache.storm.stats.StatsUtil.mergeWithAddPair(StatsUtil.java:1855)
~[storm-server-2.2.0.y.jar:2.2.0.y]
at
org.apache.storm.stats.StatsUtil.expandAveragesSeq(StatsUtil.java:2308)
~[storm-server-2.2.0.y.jar:2.2.0.y]
at
org.apache.storm.stats.StatsUtil.aggregateAverages(StatsUtil.java:832)
~[storm-server-2.2.0.y.jar:2.2.0.y]
at
org.apache.storm.stats.StatsUtil.aggregateBoltStats(StatsUtil.java:731)
~[storm-server-2.2.0.y.jar:2.2.0.y]
at
org.apache.storm.stats.StatsUtil.boltStreamsStats(StatsUtil.java:900)
~[storm-server-2.2.0.y.jar:2.2.0.y]
at
org.apache.storm.daemon.ui.UIHelpers.getVisualizationData(UIHelpers.java:1939)
~[storm-webapp-2.2.0.y.jar:2.2.0.y]
at
org.apache.storm.daemon.ui.resources.StormApiResource.getTopologyVisualization(StormApiResource.java:423)
~[storm-webapp-2.2.0.y.jar:2.2.0.y]
{code}
This is a bug in the code.
https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java#L1846-L1858
{code:java}
for (K kk : mm1.keySet()) {
List seq1 = mm1.get(kk);
List seq2 = mm2.get(kk);
List sums = new ArrayList();
for (int i = 0; i < seq1.size(); i++) {
if (seq1.get(i) instanceof Long) {
sums.add(((Number) seq1.get(i)).longValue() +
((Number) seq2.get(i)).longValue());
} else {
sums.add(((Number) seq1.get(i)).doubleValue() +
((Number) seq2.get(i)).doubleValue());
}
}
tmp.put(kk, sums);
}
{code}
It assume mm1 and mm2 always have the same key, which is not true.
And it can be reproduced by my example code:
{code:java}
public class WordCountTopology extends ConfigurableTopology {
private static final Logger LOG =
LoggerFactory.getLogger(WordCountTopology.class);
public static void main(String[] args) {
ConfigurableTopology.start(new WordCountTopology(), args);
}
protected int run(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout1", new RandomSpout(1), 1);
builder.setSpout("spout2", new RandomSpout(2), 1);
builder.setBolt("bolt", new RandomBolt(), 2).directGrouping("spout1",
"stream1")
.directGrouping("spout2", "stream2");
String topologyName = "word-count";
conf.setNumWorkers(3);
if (args != null && args.length > 0) {
topologyName = args[0];
}
return submit(topologyName, conf, builder);
}
static class RandomSpout extends BaseRichSpout {
String stream;
int id;
public RandomSpout(int id) {
this.id = id;
stream = "stream" + id;
}
int taskId = 0;
SpoutOutputCollector collector;
public void open(Map<String, Object> conf, TopologyContext context,
SpoutOutputCollector collector) {
taskId = context.getThisTaskId();
this.collector = collector;
}
/**
* Different spout send tuples to different bolt via different stream.
*/
public void nextTuple() {
LOG.info("emitting {}", id);
if (id == 1) {
Values val = new Values("test a sentence");
collector.emitDirect(2, stream, val, val);
} else {
Values val = new Values("test 2 sentence");
collector.emitDirect(3, stream, val, val);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(stream, new Fields("word"));
}
}
static class RandomBolt extends BaseBasicBolt {
public void execute(Tuple input, BasicOutputCollector collector) {
LOG.info("executing:" + input.getSourceComponent());
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
}
{code}
In this example, one of the bolt will only receive data from stream1 and
another bolt will only receive data from stream2. So in the map,
{code:java}
List seq1 = mm1.get(kk);
List seq2 = mm2.get(kk);
{code}
seq1 is null if kk is stream1, seq2 is null if kk is stream2.
We have other places aggregating executor stats without this problem because
it's using different code
https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java#L502-L513
and this problem has been taken cared of.
was:
We encountered an issue with visualization on UI.
{code:java}
2020-03-09 19:59:01.756 o.a.s.d.u.r.StormApiResource qtp1919834117-167291
[ERROR] Failure getting topology visualization
java.lang.NullPointerException: null
at
org.apache.storm.stats.StatsUtil.mergeWithAddPair(StatsUtil.java:1855)
~[storm-server-2.2.0.y.jar:2.2.0.y]
at
org.apache.storm.stats.StatsUtil.expandAveragesSeq(StatsUtil.java:2308)
~[storm-server-2.2.0.y.jar:2.2.0.y]
at
org.apache.storm.stats.StatsUtil.aggregateAverages(StatsUtil.java:832)
~[storm-server-2.2.0.y.jar:2.2.0.y]
at
org.apache.storm.stats.StatsUtil.aggregateBoltStats(StatsUtil.java:731)
~[storm-server-2.2.0.y.jar:2.2.0.y]
at
org.apache.storm.stats.StatsUtil.boltStreamsStats(StatsUtil.java:900)
~[storm-server-2.2.0.y.jar:2.2.0.y]
at
org.apache.storm.daemon.ui.UIHelpers.getVisualizationData(UIHelpers.java:1939)
~[storm-webapp-2.2.0.y.jar:2.2.0.y]
at
org.apache.storm.daemon.ui.resources.StormApiResource.getTopologyVisualization(StormApiResource.java:423)
~[storm-webapp-2.2.0.y.jar:2.2.0.y]
{code}
This is a bug in the code.
https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java#L1846-L1858
{code:java}
for (K kk : mm1.keySet()) {
List seq1 = mm1.get(kk);
List seq2 = mm2.get(kk);
List sums = new ArrayList();
for (int i = 0; i < seq1.size(); i++) {
if (seq1.get(i) instanceof Long) {
sums.add(((Number) seq1.get(i)).longValue() +
((Number) seq2.get(i)).longValue());
} else {
sums.add(((Number) seq1.get(i)).doubleValue() +
((Number) seq2.get(i)).doubleValue());
}
}
tmp.put(kk, sums);
}
{code}
It assume mm1 and mm2 always have the same key, which is not true.
And it can be reproduced by my example code:
{code:java}
public class WordCountTopology extends ConfigurableTopology {
private static final Logger LOG =
LoggerFactory.getLogger(WordCountTopology.class);
public static void main(String[] args) {
ConfigurableTopology.start(new WordCountTopology(), args);
}
protected int run(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout1", new RandomSpout(1), 1);
builder.setSpout("spout2", new RandomSpout(2), 1);
builder.setBolt("bolt", new RandomBolt(), 2).directGrouping("spout1",
"stream1")
.directGrouping("spout2", "stream2");
String topologyName = "word-count";
conf.setNumWorkers(3);
if (args != null && args.length > 0) {
topologyName = args[0];
}
return submit(topologyName, conf, builder);
}
static class RandomSpout extends BaseRichSpout {
String stream;
int id;
public RandomSpout(int id) {
this.id = id;
stream = "stream" + id;
}
int taskId = 0;
SpoutOutputCollector collector;
public void open(Map<String, Object> conf, TopologyContext context,
SpoutOutputCollector collector) {
taskId = context.getThisTaskId();
this.collector = collector;
}
/**
* Different spout send tuples to different bolt via different stream.
*/
public void nextTuple() {
LOG.info("emitting {}", id);
if (id == 1) {
Values val = new Values("test a sentence");
collector.emitDirect(2, stream, val, val);
} else {
Values val = new Values("test 2 sentence");
collector.emitDirect(3, stream, val, val);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(stream, new Fields("word"));
}
}
static class RandomBolt extends BaseBasicBolt {
public void execute(Tuple input, BasicOutputCollector collector) {
LOG.info("executing:" + input.getSourceComponent());
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
}
{code}
In this example, one of the bolt will only receive data from stream1 and
another bolt will only receive data from stream2. So in the map,
{code:java}
List seq1 = mm1.get(kk);
List seq2 = mm2.get(kk);
{code}
seq1 is null if kk is stream1, seq2 is null if kk is stream2.
> Storm UI visualization throws NullPointerException
> --------------------------------------------------
>
> Key: STORM-3598
> URL: https://issues.apache.org/jira/browse/STORM-3598
> Project: Apache Storm
> Issue Type: Bug
> Affects Versions: 2.0.0, 2.1.0
> Reporter: Ethan Li
> Assignee: Ethan Li
> Priority: Major
>
> We encountered an issue with visualization on UI.
>
> {code:java}
> 2020-03-09 19:59:01.756 o.a.s.d.u.r.StormApiResource qtp1919834117-167291
> [ERROR] Failure getting topology visualization
> java.lang.NullPointerException: null
> at
> org.apache.storm.stats.StatsUtil.mergeWithAddPair(StatsUtil.java:1855)
> ~[storm-server-2.2.0.y.jar:2.2.0.y]
> at
> org.apache.storm.stats.StatsUtil.expandAveragesSeq(StatsUtil.java:2308)
> ~[storm-server-2.2.0.y.jar:2.2.0.y]
> at
> org.apache.storm.stats.StatsUtil.aggregateAverages(StatsUtil.java:832)
> ~[storm-server-2.2.0.y.jar:2.2.0.y]
> at
> org.apache.storm.stats.StatsUtil.aggregateBoltStats(StatsUtil.java:731)
> ~[storm-server-2.2.0.y.jar:2.2.0.y]
> at
> org.apache.storm.stats.StatsUtil.boltStreamsStats(StatsUtil.java:900)
> ~[storm-server-2.2.0.y.jar:2.2.0.y]
> at
> org.apache.storm.daemon.ui.UIHelpers.getVisualizationData(UIHelpers.java:1939)
> ~[storm-webapp-2.2.0.y.jar:2.2.0.y]
> at
> org.apache.storm.daemon.ui.resources.StormApiResource.getTopologyVisualization(StormApiResource.java:423)
> ~[storm-webapp-2.2.0.y.jar:2.2.0.y]
> {code}
> This is a bug in the code.
> https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java#L1846-L1858
> {code:java}
> for (K kk : mm1.keySet()) {
> List seq1 = mm1.get(kk);
> List seq2 = mm2.get(kk);
> List sums = new ArrayList();
> for (int i = 0; i < seq1.size(); i++) {
> if (seq1.get(i) instanceof Long) {
> sums.add(((Number) seq1.get(i)).longValue() +
> ((Number) seq2.get(i)).longValue());
> } else {
> sums.add(((Number) seq1.get(i)).doubleValue() +
> ((Number) seq2.get(i)).doubleValue());
> }
> }
> tmp.put(kk, sums);
> }
> {code}
> It assume mm1 and mm2 always have the same key, which is not true.
> And it can be reproduced by my example code:
> {code:java}
> public class WordCountTopology extends ConfigurableTopology {
> private static final Logger LOG =
> LoggerFactory.getLogger(WordCountTopology.class);
> public static void main(String[] args) {
> ConfigurableTopology.start(new WordCountTopology(), args);
> }
> protected int run(String[] args) {
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("spout1", new RandomSpout(1), 1);
> builder.setSpout("spout2", new RandomSpout(2), 1);
> builder.setBolt("bolt", new RandomBolt(), 2).directGrouping("spout1",
> "stream1")
> .directGrouping("spout2", "stream2");
> String topologyName = "word-count";
> conf.setNumWorkers(3);
> if (args != null && args.length > 0) {
> topologyName = args[0];
> }
> return submit(topologyName, conf, builder);
> }
> static class RandomSpout extends BaseRichSpout {
> String stream;
> int id;
> public RandomSpout(int id) {
> this.id = id;
> stream = "stream" + id;
> }
> int taskId = 0;
> SpoutOutputCollector collector;
> public void open(Map<String, Object> conf, TopologyContext context,
> SpoutOutputCollector collector) {
> taskId = context.getThisTaskId();
> this.collector = collector;
> }
> /**
> * Different spout send tuples to different bolt via different stream.
> */
> public void nextTuple() {
> LOG.info("emitting {}", id);
> if (id == 1) {
> Values val = new Values("test a sentence");
> collector.emitDirect(2, stream, val, val);
> } else {
> Values val = new Values("test 2 sentence");
> collector.emitDirect(3, stream, val, val);
> }
> try {
> Thread.sleep(1000);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> }
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declareStream(stream, new Fields("word"));
> }
> }
> static class RandomBolt extends BaseBasicBolt {
> public void execute(Tuple input, BasicOutputCollector collector) {
> LOG.info("executing:" + input.getSourceComponent());
> }
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> }
> }
> }
> {code}
> In this example, one of the bolt will only receive data from stream1 and
> another bolt will only receive data from stream2. So in the map,
> {code:java}
> List seq1 = mm1.get(kk);
> List seq2 = mm2.get(kk);
> {code}
> seq1 is null if kk is stream1, seq2 is null if kk is stream2.
>
> We have other places aggregating executor stats without this problem because
> it's using different code
> https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java#L502-L513
> and this problem has been taken cared of.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)