Github user omkreddy commented on a diff in the pull request:
https://github.com/apache/storm/pull/2308#discussion_r137044069
--- Diff:
external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
---
@@ -78,27 +76,28 @@ public void prepare(Map stormConf, TopologyContext
context, OutputCollector coll
@Override
protected void process(final Tuple tuple) {
- Future future =
tranquilizer.send((druidEventMapper.getEvent(tuple)));
- LOG.debug("Sent tuple : [{}]", tuple);
+ final E mappedEvent = druidEventMapper.getEvent(tuple);
+ Future future = tranquilizer.send(mappedEvent);
+ LOG.debug("Sent tuple : [{}]", mappedEvent);
future.addEventListener(new FutureEventListener() {
@Override
public void onFailure(Throwable cause) {
if (cause instanceof MessageDroppedException) {
collector.ack(tuple);
- LOG.debug("Tuple Dropped due to
MessageDroppedException : [{}]", tuple);
+ LOG.debug("Tuple Dropped due to
MessageDroppedException : [{}]", mappedEvent, cause);
if (druidConfig.getDiscardStreamId() != null)
collector.emit(druidConfig.getDiscardStreamId(),
new Values(tuple, System.currentTimeMillis()));
} else {
collector.fail(tuple);
- LOG.debug("Tuple Processing Failed : [{}]", tuple);
+ LOG.debug("Tuple Processing Failed : [{}]",
mappedEvent, cause);
--- End diff --
error level more suitable here. Updated the PR.
---