Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158034510 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java --- @@ -122,28 +130,33 @@ public Task(Executor executor, Integer taskId) throws IOException { return new ArrayList<>(0); } + public List<Integer> getOutgoingTasks(String stream, List<Object> values) { if (debug) { LOG.info("Emitting Tuple: taskId={} componentId={} stream={} values={}", taskId, componentId, stream, values); } - List<Integer> outTasks = new ArrayList<>(); - if (!streamComponentToGrouper.containsKey(stream)) { - throw new IllegalArgumentException("Unknown stream ID: " + stream); - } - if (null != streamComponentToGrouper.get(stream)) { - // null value for __system - for (LoadAwareCustomStreamGrouping grouper : streamComponentToGrouper.get(stream).values()) { + ArrayList<Integer> outTasks = new ArrayList<>(); + + // TODO: PERF: expensive hashtable lookup in critical path + ArrayList<LoadAwareCustomStreamGrouping> groupers = streamToGroupers.get(stream); + if (null != groupers) { + for (int i=0; i<groupers.size(); ++i) { + LoadAwareCustomStreamGrouping grouper = groupers.get(i); if (grouper == GrouperFactory.DIRECT) { throw new IllegalArgumentException("Cannot do regular emit to direct stream"); } List<Integer> compTasks = grouper.chooseTasks(taskId, values, loadMapping); - outTasks.addAll(compTasks); + outTasks.addAll(compTasks); // TODO: PERF: this is a perf hit --- End diff -- Same here: may be better to decide. IMHO I'm still not convinced that it can introduce performance hit. Something I can imagine are allocating backed array (only once in method call) and expanding array, but unless we use fan-out in huge size topology, outTasks is expected to be small.
---