Repository: storm
Updated Branches:
  refs/heads/master d0c5457c0 -> 6207d320c


STORM-2786: Turn ticks back on for ackers (and optimizes ackers a bit)


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b09e9869
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b09e9869
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b09e9869

Branch: refs/heads/master
Commit: b09e98698e298ad0edaf1f4916c663875d387c2e
Parents: 7b940ae
Author: Robert (Bobby) Evans <[email protected]>
Authored: Mon Oct 23 11:31:46 2017 -0500
Committer: Robert (Bobby) Evans <[email protected]>
Committed: Mon Oct 23 13:15:39 2017 -0500

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/daemon/Acker.java  | 43 ++++++++++----------
 .../jvm/org/apache/storm/executor/Executor.java | 21 ++++------
 2 files changed, 30 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b09e9869/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Acker.java 
b/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
index c41baee..8675e39 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
@@ -22,6 +22,7 @@ import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.RotatingMap;
 import org.apache.storm.utils.TupleUtils;
@@ -46,11 +47,11 @@ public class Acker implements IBolt {
     private OutputCollector collector;
     private RotatingMap<Object, AckObject> pending;
 
-    private class AckObject {
+    private static class AckObject {
         public long val = 0L;
-        public Integer spoutTask = null;
+        public long startTime = Time.currentTimeMillis();
+        public int spoutTask = -1;
         public boolean failed = false;
-        public long startTime = System.currentTimeMillis();
 
         // val xor value
         public void updateAck(Long value) {
@@ -61,7 +62,7 @@ public class Acker implements IBolt {
     @Override
     public void prepare(Map<String, Object> topoConf, TopologyContext context, 
OutputCollector collector) {
         this.collector = collector;
-        this.pending = new RotatingMap<Object, AckObject>(TIMEOUT_BUCKET_NUM);
+        this.pending = new RotatingMap<>(TIMEOUT_BUCKET_NUM);
     }
 
     @Override
@@ -72,28 +73,23 @@ public class Acker implements IBolt {
             return;
         }
 
+        boolean resetTimeout = false;
         String streamId = input.getSourceStreamId();
         Object id = input.getValue(0);
         AckObject curr = pending.get(id);
         if (ACKER_INIT_STREAM_ID.equals(streamId)) {
             if (curr == null) {
                 curr = new AckObject();
-                curr.val = input.getLong(1);
-                curr.spoutTask = input.getInteger(2);
                 pending.put(id, curr);
-            } else {
-                // If receiving bolt's ack before the init message from spout, 
just update the xor value.
-                curr.updateAck(input.getLong(1));
-                curr.spoutTask = input.getInteger(2);
             }
+            curr.updateAck(input.getLong(1));
+            curr.spoutTask = input.getInteger(2);
         } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
-            if (curr != null) {
-                curr.updateAck(input.getLong(1));
-            } else {
+            if (curr == null) {
                 curr = new AckObject();
-                curr.val = input.getLong(1);
                 pending.put(id, curr);
             }
+            curr.updateAck(input.getLong(1));
         } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
             // For the case that ack_fail message arrives before ack_init
             if (curr == null) {
@@ -102,17 +98,18 @@ public class Acker implements IBolt {
             curr.failed = true;
             pending.put(id, curr);
         } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
-            if (curr == null) {
-                curr = new AckObject();
-            }
-            pending.put(id, curr);
+            resetTimeout = true;
+            if (curr != null) {
+                pending.put(id, curr);
+            } //else if it has not been added yet, there is no reason time it 
out later on
         } else {
             LOG.warn("Unknown source stream {} from task-{}", streamId, 
input.getSourceTask());
             return;
         }
 
-        Integer task = curr.spoutTask;
-        if (curr != null && task != null) {
+        int task = curr.spoutTask;
+        if (curr != null && task >= 0
+            && (curr.val == 0 || curr.failed || resetTimeout)) {
             Values tuple = new Values(id, getTimeDeltaMillis(curr.startTime));
             if (curr.val == 0) {
                 pending.remove(id);
@@ -120,8 +117,10 @@ public class Acker implements IBolt {
             } else if (curr.failed) {
                 pending.remove(id);
                 collector.emitDirect(task, ACKER_FAIL_STREAM_ID, tuple);
-            } else if(ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+            } else if(resetTimeout) {
                 collector.emitDirect(task, ACKER_RESET_TIMEOUT_STREAM_ID, 
tuple);
+            } else {
+                throw new IllegalStateException("The checks are inconsistent 
we reach what should be unreachable code.");
             }
         }
 
@@ -134,6 +133,6 @@ public class Acker implements IBolt {
     }
 
     private long getTimeDeltaMillis(long startTimeMillis) {
-        return System.currentTimeMillis() - startTimeMillis;
+        return Time.currentTimeMillis() - startTimeMillis;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b09e9869/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java 
b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index e55aca0..3c39194 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -40,6 +40,7 @@ import org.apache.storm.cluster.ClusterStateContext;
 import org.apache.storm.cluster.ClusterUtils;
 import org.apache.storm.cluster.DaemonType;
 import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.Acker;
 import org.apache.storm.daemon.GrouperFactory;
 import org.apache.storm.daemon.StormCommon;
 import org.apache.storm.daemon.Task;
@@ -381,20 +382,16 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
         final Integer tickTimeSecs = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);
         boolean enableMessageTimeout = (Boolean) 
topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
         if (tickTimeSecs != null) {
-            if (Utils.isSystemId(componentId) || (!enableMessageTimeout && 
isSpout)) {
-                LOG.info("Timeouts disabled for executor " + componentId + ":" 
+ executorId);
+            if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) && 
Utils.isSystemId(componentId))
+                || (!enableMessageTimeout && isSpout)) {
+                LOG.info("Timeouts disabled for executor {}:{}", componentId, 
executorId);
             } else {
                 StormTimer timerTask = workerData.getUserTimer();
-                timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs, new 
Runnable() {
-                    @Override
-                    public void run() {
-                        TupleImpl tuple = new TupleImpl(workerTopologyContext, 
new Values(tickTimeSecs),
-                                (int) Constants.SYSTEM_TASK_ID, 
Constants.SYSTEM_TICK_STREAM_ID);
-                        List<AddressedTuple> tickTuple =
-                                Lists.newArrayList(new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
-                        receiveQueue.publish(tickTuple);
-                    }
-                });
+                TupleImpl tuple = new TupleImpl(workerTopologyContext, new 
Values(tickTimeSecs),
+                    (int) Constants.SYSTEM_TASK_ID, 
Constants.SYSTEM_TICK_STREAM_ID);
+                final List<AddressedTuple> tickTuple =
+                    Lists.newArrayList(new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
+                timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs, () -> 
receiveQueue.publish(tickTuple));
             }
         }
     }

Reply via email to