Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2308#discussion_r136989219 --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java --- @@ -78,27 +76,28 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll @Override protected void process(final Tuple tuple) { - Future future = tranquilizer.send((druidEventMapper.getEvent(tuple))); - LOG.debug("Sent tuple : [{}]", tuple); + final E mappedEvent = druidEventMapper.getEvent(tuple); + Future future = tranquilizer.send(mappedEvent); + LOG.debug("Sent tuple : [{}]", mappedEvent); future.addEventListener(new FutureEventListener() { @Override public void onFailure(Throwable cause) { if (cause instanceof MessageDroppedException) { collector.ack(tuple); - LOG.debug("Tuple Dropped due to MessageDroppedException : [{}]", tuple); + LOG.debug("Tuple Dropped due to MessageDroppedException : [{}]", mappedEvent, cause); if (druidConfig.getDiscardStreamId() != null) collector.emit(druidConfig.getDiscardStreamId(), new Values(tuple, System.currentTimeMillis())); } else { collector.fail(tuple); - LOG.debug("Tuple Processing Failed : [{}]", tuple); + LOG.debug("Tuple Processing Failed : [{}]", mappedEvent, cause); --- End diff -- nit: why is a tuple failing a debug message?
---