Repository: storm Updated Branches: refs/heads/master d0c5457c0 -> 6207d320c
STORM-2786: Turn ticks back on for ackers (and optimizes ackers a bit) Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b09e9869 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b09e9869 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b09e9869 Branch: refs/heads/master Commit: b09e98698e298ad0edaf1f4916c663875d387c2e Parents: 7b940ae Author: Robert (Bobby) Evans <[email protected]> Authored: Mon Oct 23 11:31:46 2017 -0500 Committer: Robert (Bobby) Evans <[email protected]> Committed: Mon Oct 23 13:15:39 2017 -0500 ---------------------------------------------------------------------- .../src/jvm/org/apache/storm/daemon/Acker.java | 43 ++++++++++---------- .../jvm/org/apache/storm/executor/Executor.java | 21 ++++------ 2 files changed, 30 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b09e9869/storm-client/src/jvm/org/apache/storm/daemon/Acker.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Acker.java b/storm-client/src/jvm/org/apache/storm/daemon/Acker.java index c41baee..8675e39 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/Acker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/Acker.java @@ -22,6 +22,7 @@ import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; import org.apache.storm.utils.RotatingMap; import org.apache.storm.utils.TupleUtils; @@ -46,11 +47,11 @@ public class Acker implements IBolt { private OutputCollector collector; private RotatingMap<Object, AckObject> pending; - private class AckObject { + private static class AckObject { public long val = 0L; - public Integer spoutTask = null; + public long startTime = Time.currentTimeMillis(); + public int spoutTask = -1; public boolean failed = false; - public long startTime = System.currentTimeMillis(); // val xor value public void updateAck(Long value) { @@ -61,7 +62,7 @@ public class Acker implements IBolt { @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { this.collector = collector; - this.pending = new RotatingMap<Object, AckObject>(TIMEOUT_BUCKET_NUM); + this.pending = new RotatingMap<>(TIMEOUT_BUCKET_NUM); } @Override @@ -72,28 +73,23 @@ public class Acker implements IBolt { return; } + boolean resetTimeout = false; String streamId = input.getSourceStreamId(); Object id = input.getValue(0); AckObject curr = pending.get(id); if (ACKER_INIT_STREAM_ID.equals(streamId)) { if (curr == null) { curr = new AckObject(); - curr.val = input.getLong(1); - curr.spoutTask = input.getInteger(2); pending.put(id, curr); - } else { - // If receiving bolt's ack before the init message from spout, just update the xor value. - curr.updateAck(input.getLong(1)); - curr.spoutTask = input.getInteger(2); } + curr.updateAck(input.getLong(1)); + curr.spoutTask = input.getInteger(2); } else if (ACKER_ACK_STREAM_ID.equals(streamId)) { - if (curr != null) { - curr.updateAck(input.getLong(1)); - } else { + if (curr == null) { curr = new AckObject(); - curr.val = input.getLong(1); pending.put(id, curr); } + curr.updateAck(input.getLong(1)); } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) { // For the case that ack_fail message arrives before ack_init if (curr == null) { @@ -102,17 +98,18 @@ public class Acker implements IBolt { curr.failed = true; pending.put(id, curr); } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) { - if (curr == null) { - curr = new AckObject(); - } - pending.put(id, curr); + resetTimeout = true; + if (curr != null) { + pending.put(id, curr); + } //else if it has not been added yet, there is no reason time it out later on } else { LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask()); return; } - Integer task = curr.spoutTask; - if (curr != null && task != null) { + int task = curr.spoutTask; + if (curr != null && task >= 0 + && (curr.val == 0 || curr.failed || resetTimeout)) { Values tuple = new Values(id, getTimeDeltaMillis(curr.startTime)); if (curr.val == 0) { pending.remove(id); @@ -120,8 +117,10 @@ public class Acker implements IBolt { } else if (curr.failed) { pending.remove(id); collector.emitDirect(task, ACKER_FAIL_STREAM_ID, tuple); - } else if(ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) { + } else if(resetTimeout) { collector.emitDirect(task, ACKER_RESET_TIMEOUT_STREAM_ID, tuple); + } else { + throw new IllegalStateException("The checks are inconsistent we reach what should be unreachable code."); } } @@ -134,6 +133,6 @@ public class Acker implements IBolt { } private long getTimeDeltaMillis(long startTimeMillis) { - return System.currentTimeMillis() - startTimeMillis; + return Time.currentTimeMillis() - startTimeMillis; } } http://git-wip-us.apache.org/repos/asf/storm/blob/b09e9869/storm-client/src/jvm/org/apache/storm/executor/Executor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java index e55aca0..3c39194 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java @@ -40,6 +40,7 @@ import org.apache.storm.cluster.ClusterStateContext; import org.apache.storm.cluster.ClusterUtils; import org.apache.storm.cluster.DaemonType; import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.Acker; import org.apache.storm.daemon.GrouperFactory; import org.apache.storm.daemon.StormCommon; import org.apache.storm.daemon.Task; @@ -381,20 +382,16 @@ public abstract class Executor implements Callable, EventHandler<Object> { final Integer tickTimeSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null); boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS); if (tickTimeSecs != null) { - if (Utils.isSystemId(componentId) || (!enableMessageTimeout && isSpout)) { - LOG.info("Timeouts disabled for executor " + componentId + ":" + executorId); + if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) && Utils.isSystemId(componentId)) + || (!enableMessageTimeout && isSpout)) { + LOG.info("Timeouts disabled for executor {}:{}", componentId, executorId); } else { StormTimer timerTask = workerData.getUserTimer(); - timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs, new Runnable() { - @Override - public void run() { - TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs), - (int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID); - List<AddressedTuple> tickTuple = - Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple)); - receiveQueue.publish(tickTuple); - } - }); + TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(tickTimeSecs), + (int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID); + final List<AddressedTuple> tickTuple = + Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple)); + timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs, () -> receiveQueue.publish(tickTuple)); } } }
