STORM-756 Introduce ShellBoltMessageQueue * ShellBoltMessageQueue contains two different queues ** one is for taskids (unbounded) ** another one is for bolt msg (bounded) * Poll priority between two queue: task ids is higher than bolt msg ** poll() returns task ids whenever available, and returns bolt msg if task ids is not available * poll() behaves like LinkedBlockingQueue.pool() with timeout manner ** awaits while not available, wake up when available or timed-out
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/56dc7b9d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/56dc7b9d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/56dc7b9d Branch: refs/heads/master Commit: 56dc7b9d25f6b80856c541a71a036b2574d58ae8 Parents: 124f664 Author: Jungtaek Lim <[email protected]> Authored: Tue Nov 24 14:56:43 2015 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Tue Nov 24 14:56:43 2015 +0900 ---------------------------------------------------------------------- .../src/jvm/backtype/storm/task/ShellBolt.java | 31 +++-- .../storm/utils/ShellBoltMessageQueue.java | 121 +++++++++++++++++++ 2 files changed, 136 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/56dc7b9d/storm-core/src/jvm/backtype/storm/task/ShellBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java index 1d97d53..215094b 100644 --- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java @@ -26,6 +26,7 @@ import backtype.storm.multilang.BoltMsg; import backtype.storm.multilang.ShellMsg; import backtype.storm.topology.ReportedFailedException; import backtype.storm.tuple.Tuple; +import backtype.storm.utils.ShellBoltMessageQueue; import backtype.storm.utils.ShellProcess; import clojure.lang.RT; import com.google.common.util.concurrent.MoreExecutors; @@ -37,6 +38,8 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import static java.util.concurrent.TimeUnit.SECONDS; + /** * A bolt that shells out to another process to process tuples. ShellBolt * communicates with that process over stdio using a special protocol. An ~100 @@ -75,8 +78,7 @@ public class ShellBolt implements IBolt { private ShellProcess _process; private volatile boolean _running = true; private volatile Throwable _exception; - private LinkedBlockingQueue<BoltMsg> _pendingWrites = new LinkedBlockingQueue<>(); - private LinkedBlockingQueue<List<Integer>> _pendingTaskIds = new LinkedBlockingQueue<>(); + private ShellBoltMessageQueue _pendingWrites = new ShellBoltMessageQueue(); private Random _rand; private Thread _readerThread; @@ -106,8 +108,9 @@ public class ShellBolt implements IBolt { final OutputCollector collector) { Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING); if (maxPending != null) { - this._pendingWrites = new LinkedBlockingQueue<>(((Number)maxPending).intValue()); + this._pendingWrites = new ShellBoltMessageQueue(((Number)maxPending).intValue()); } + _rand = new Random(); _collector = collector; @@ -149,7 +152,7 @@ public class ShellBolt implements IBolt { try { BoltMsg boltMsg = createBoltMessage(input, genId); - _pendingWrites.put(boltMsg); + _pendingWrites.putBoltMsg(boltMsg); } catch(InterruptedException e) { String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString(); throw new RuntimeException("Error during multilang processing " + processInfo, e); @@ -211,7 +214,7 @@ public class ShellBolt implements IBolt { if(shellMsg.getTask() == 0) { List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple()); if (shellMsg.areTaskIdsNeeded()) { - _pendingTaskIds.put(outtasks); + _pendingWrites.putTaskIds(outtasks); } } else { _collector.emitDirect((int) shellMsg.getTask(), @@ -373,17 +376,13 @@ public class ShellBolt implements IBolt { sendHeartbeatFlag.compareAndSet(true, false); } - List<Integer> taskIds = _pendingTaskIds.peek(); - if (taskIds != null) { - taskIds = _pendingTaskIds.poll(); - _process.writeTaskIds(taskIds); - continue; - } - - BoltMsg write = _pendingWrites.peek(); - if (write != null) { - write = _pendingWrites.poll(); - _process.writeBoltMsg(write); + Object write = _pendingWrites.poll(1, SECONDS); + if (write instanceof BoltMsg) { + _process.writeBoltMsg((BoltMsg) write); + } else if (write instanceof List<?>) { + _process.writeTaskIds((List<Integer>)write); + } else if (write != null) { + throw new RuntimeException("Unknown class type to write: " + write.getClass().getName()); } } catch (Throwable t) { die(t); http://git-wip-us.apache.org/repos/asf/storm/blob/56dc7b9d/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java b/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java new file mode 100644 index 0000000..b633bc5 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/utils/ShellBoltMessageQueue.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backtype.storm.utils; + +import backtype.storm.multilang.BoltMsg; + +import java.io.Serializable; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A data structure for ShellBolt which includes two queues (FIFO), + * which one is for task ids (unbounded), another one is for bolt msg (bounded). + */ +public class ShellBoltMessageQueue implements Serializable { + private final LinkedList<List<Integer>> taskIdsQueue = new LinkedList<>(); + private final LinkedBlockingQueue<BoltMsg> boltMsgQueue; + + private final ReentrantLock takeLock = new ReentrantLock(); + private final Condition notEmpty = takeLock.newCondition(); + + public ShellBoltMessageQueue(int boltMsgCapacity) { + if (boltMsgCapacity <= 0) { + throw new IllegalArgumentException(); + } + this.boltMsgQueue = new LinkedBlockingQueue<>(boltMsgCapacity); + } + + public ShellBoltMessageQueue() { + this(Integer.MAX_VALUE); + } + + /** + * put list of task id to its queue + * @param taskIds task ids that received the tuples + */ + public void putTaskIds(List<Integer> taskIds) { + taskIdsQueue.add(taskIds); + takeLock.lock(); + try { + notEmpty.signal(); + } finally { + takeLock.unlock(); + } + } + + /** + * put bolt message to its queue + * @param boltMsg BoltMsg to pass to subprocess + * @throws InterruptedException + */ + public void putBoltMsg(BoltMsg boltMsg) throws InterruptedException { + boltMsgQueue.put(boltMsg); + takeLock.lock(); + try { + notEmpty.signal(); + } finally { + takeLock.unlock(); + } + } + + /** + * poll() is a core feature of ShellBoltMessageQueue. + * It retrieves and removes the head of one queues, waiting up to the + * specified wait time if necessary for an element to become available. + * There's priority that what queue it retrieves first, taskIds is higher than boltMsgQueue. + * + * @param timeout how long to wait before giving up, in units of unit + * @param unit a TimeUnit determining how to interpret the timeout parameter + * @return List\<Integer\> if task id is available, + * BoltMsg if task id is not available but bolt message is available, + * null if the specified waiting time elapses before an element is available. + * @throws InterruptedException + */ + public Object poll(long timeout, TimeUnit unit) throws InterruptedException { + takeLock.lockInterruptibly(); + long nanos = unit.toNanos(timeout); + try { + // wait for available queue + while (taskIdsQueue.peek() == null && boltMsgQueue.peek() == null) { + if (nanos <= 0) { + return null; + } + nanos = notEmpty.awaitNanos(nanos); + } + + // taskIds first + List<Integer> taskIds = taskIdsQueue.peek(); + if (taskIds != null) { + taskIds = taskIdsQueue.poll(); + return taskIds; + } + + // boltMsgQueue should have at least one entry at the moment + return boltMsgQueue.poll(); + } finally { + takeLock.unlock(); + } + } + +}
