Add a missing space, fix potential NPE, add comment to javadoc about reset 
timeout being expensive


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bc79b4a8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bc79b4a8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bc79b4a8

Branch: refs/heads/master
Commit: bc79b4a8d757a3191a85815877345d38710c73e2
Parents: d36be51
Author: Stig Døssing <[email protected]>
Authored: Wed Mar 2 17:58:01 2016 +0100
Committer: Stig Døssing <[email protected]>
Committed: Wed Mar 2 17:59:14 2016 +0100

----------------------------------------------------------------------
 storm-core/src/jvm/org/apache/storm/daemon/Acker.java          | 5 ++++-
 storm-core/src/jvm/org/apache/storm/task/OutputCollector.java  | 1 +
 .../jvm/org/apache/storm/topology/BasicOutputCollector.java    | 6 ++++++
 3 files changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bc79b4a8/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java 
b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
index eb14af7..d7b9a2e 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
@@ -101,7 +101,10 @@ public class Acker implements IBolt {
             }
             curr.failed = true;
             pending.put(id, curr);
-        } else if(ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+        } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+            if (curr == null) {
+                curr = new AckObject();
+            }
             pending.put(id, curr);
         } else {
             LOG.warn("Unknown source stream {} from task-{}", streamId, 
input.getSourceTask());

http://git-wip-us.apache.org/repos/asf/storm/blob/bc79b4a8/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java 
b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
index 071d8aa..4db87f0 100644
--- a/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/task/OutputCollector.java
@@ -221,6 +221,7 @@ public class OutputCollector implements IOutputCollector {
     /**
     * Resets the message timeout for any tuple trees to which the given tuple 
belongs.
     * The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS.
+    * Note that this is an expensive operation, and should be used sparingly.
     * @param input the tuple to reset timeout for
     */
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/bc79b4a8/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java 
b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
index 343c349..1d1e5ff 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/BasicOutputCollector.java
@@ -52,6 +52,12 @@ public class BasicOutputCollector implements 
IBasicOutputCollector {
         emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
     }
 
+    /**
+    * Resets the message timeout for any tuple trees to which the given tuple 
belongs.
+    * The timeout is reset to Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS.
+    * Note that this is an expensive operation, and should be used sparingly.
+    * @param input the tuple to reset timeout for
+    */
     public void resetTimeout(Tuple tuple){
         out.resetTimeout(tuple);
     }

Reply via email to