Repository: storm Updated Branches: refs/heads/master d8384f43b -> 29a44e57a
STORM-2231 Fix multi-threads issue on executor send queue * we have use cases which launches threads and emit/ack concurrently * launched threads will create ThreadLocalBatcher for each * hence 'synchronized' in add() wouldn't help unlike background flushes Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2b15fc4f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2b15fc4f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2b15fc4f Branch: refs/heads/master Commit: 2b15fc4fc7189f2f42a0fea13f2ca00a6675d6c7 Parents: 0bf7e70 Author: Jungtaek Lim <[email protected]> Authored: Thu Aug 24 20:11:41 2017 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Thu Aug 24 20:23:43 2017 +0900 ---------------------------------------------------------------------- storm-client/src/jvm/org/apache/storm/executor/Executor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/2b15fc4f/storm-client/src/jvm/org/apache/storm/executor/Executor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java index c1c6350..842141c 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java @@ -387,7 +387,7 @@ public abstract class Executor implements Callable, EventHandler<Object> { int waitTimeOutMs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS)); int batchSize = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE)); int batchTimeOutMs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS)); - return new DisruptorQueue("executor" + executorId + "-send-queue", ProducerType.SINGLE, + return new DisruptorQueue("executor" + executorId + "-send-queue", ProducerType.MULTI, sendSize, waitTimeOutMs, batchSize, batchTimeOutMs); }
