http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java 
b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 1fb3be3..9c7cf9e 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,7 @@ package org.apache.storm.daemon.worker;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -29,7 +30,9 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
+
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -38,9 +41,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.StormTimer;
+import org.apache.storm.messaging.netty.BackPressureStatus;
 import org.apache.storm.cluster.IStateStorage;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.cluster.VersionedData;
@@ -63,29 +68,33 @@ import org.apache.storm.messaging.ConnectionWithStatus;
 import org.apache.storm.messaging.DeserializingConnectionCallback;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.IContext;
-import org.apache.storm.messaging.TaskMessage;
 import org.apache.storm.messaging.TransportFactory;
 import org.apache.storm.security.auth.IAutoCredentials;
+
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.serialization.ITupleSerializer;
 import org.apache.storm.serialization.KryoTupleSerializer;
 import org.apache.storm.task.WorkerTopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.JCQueue;
+import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ThriftTopologyUtils;
-import org.apache.storm.utils.TransferDrainer;
-import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.Utils.SmartThread;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class WorkerState {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(WorkerState.class);
+    private static long dropCount = 0;
 
     final Map<String, Object> conf;
     final IContext mqContext;
+    private final WorkerTransfer workerTransfer;
+    private final BackPressureTracker bpTracker;
 
     public Map getConf() {
         return conf;
@@ -119,12 +128,16 @@ public class WorkerState {
         return stormComponentToDebug;
     }
 
-    public Set<List<Long>> getExecutors() {
-        return executors;
+    public Set<List<Long>> getLocalExecutors() {
+        return localExecutors;
+    }
+
+    public List<Integer> getLocalTaskIds() {
+        return localTaskIds;
     }
 
-    public List<Integer> getTaskIds() {
-        return taskIds;
+    public Map<Integer, JCQueue> getLocalReceiveQueues() {
+        return localReceiveQueues;
     }
 
     public Map getTopologyConf() {
@@ -159,10 +172,14 @@ public class WorkerState {
         return cachedNodeToPortSocket;
     }
 
-    public Map<List<Long>, DisruptorQueue> getExecutorReceiveQueueMap() {
+    public Map<List<Long>, JCQueue> getExecutorReceiveQueueMap() {
         return executorReceiveQueueMap;
     }
 
+    public Map<Integer, JCQueue> getShortExecutorReceiveQueueMap() {
+        return shortExecutorReceiveQueueMap;
+    }
+
     public Runnable getSuicideCallback() {
         return suicideCallback;
     }
@@ -195,9 +212,11 @@ public class WorkerState {
     final AtomicBoolean isTopologyActive;
     final AtomicReference<Map<String, DebugOptions>> stormComponentToDebug;
 
-    // executors and taskIds running in this worker
-    final Set<List<Long>> executors;
-    final List<Integer> taskIds;
+    // local executors and localTaskIds running in this worker
+    final Set<List<Long>> localExecutors;
+    final ArrayList<Integer> localTaskIds;
+    final Map<Integer, JCQueue> localReceiveQueues = new HashMap<>(); // 
[taskId]-> JCQueue :  initialized after local executors are initialized
+
     final Map<String, Object> topologyConf;
     final StormTopology topology;
     final StormTopology systemTopology;
@@ -208,10 +227,10 @@ public class WorkerState {
     final ReentrantReadWriteLock endpointSocketLock;
     final AtomicReference<Map<Integer, NodeInfo>> cachedTaskToNodePort;
     final AtomicReference<Map<NodeInfo, IConnection>> cachedNodeToPortSocket;
-    final Map<List<Long>, DisruptorQueue> executorReceiveQueueMap;
+    final Map<List<Long>, JCQueue> executorReceiveQueueMap;
     // executor id is in form [start_task_id end_task_id]
     // short executor id is start_task_id
-    final Map<Integer, DisruptorQueue> shortExecutorReceiveQueueMap;
+    final Map<Integer, JCQueue> shortExecutorReceiveQueueMap;
     final Map<Integer, Integer> taskToShortExecutor;
     final Runnable suicideCallback;
     final Utils.UptimeComputer uptime;
@@ -219,16 +238,6 @@ public class WorkerState {
     final Map<String, Object> userSharedResources;
     final LoadMapping loadMapping;
     final AtomicReference<Map<String, VersionedData<Assignment>>> 
assignmentVersions;
-    // Whether this worker is going slow. 0 indicates the backpressure is off
-    final AtomicLong backpressure = new AtomicLong(0);
-    // How long until the backpressure znode is invalid.
-    final long backpressureZnodeTimeoutMs;
-    // If the transfer queue is backed-up
-    final AtomicBoolean transferBackpressure = new AtomicBoolean(false);
-    // a trigger for synchronization with executors
-    final AtomicBoolean backpressureTrigger = new AtomicBoolean(false);
-    // whether the throttle is activated for spouts
-    final AtomicBoolean throttleOn = new AtomicBoolean(false);
 
     public LoadMapping getLoadMapping() {
         return loadMapping;
@@ -238,24 +247,10 @@ public class WorkerState {
         return assignmentVersions;
     }
 
-    public AtomicBoolean getBackpressureTrigger() {
-        return backpressureTrigger;
-    }
-
-    public AtomicBoolean getThrottleOn() {
-        return throttleOn;
-    }
-
-    public DisruptorQueue getTransferQueue() {
-        return transferQueue;
-    }
-
     public StormTimer getUserTimer() {
         return userTimer;
     }
 
-    final DisruptorQueue transferQueue;
-
     // Timers
     final StormTimer heartbeatTimer = mkHaltingTimer("heartbeat-timer");
     final StormTimer refreshLoadTimer = mkHaltingTimer("refresh-load-timer");
@@ -265,16 +260,16 @@ public class WorkerState {
     final StormTimer resetLogLevelsTimer = 
mkHaltingTimer("reset-log-levels-timer");
     final StormTimer refreshActiveTimer = 
mkHaltingTimer("refresh-active-timer");
     final StormTimer executorHeartbeatTimer = 
mkHaltingTimer("executor-heartbeat-timer");
-    final StormTimer refreshBackpressureTimer = 
mkHaltingTimer("refresh-backpressure-timer");
+    final StormTimer flushTupleTimer = mkHaltingTimer("flush-tuple-timer");
     final StormTimer userTimer = mkHaltingTimer("user-timer");
+    final StormTimer backPressureCheckTimer = 
mkHaltingTimer("backpressure-check-timer");
 
     // global variables only used internally in class
     private final Set<Integer> outboundTasks;
-    private final AtomicLong nextUpdate = new AtomicLong(0);
+    private final AtomicLong nextLoadUpdate = new AtomicLong(0);
+
     private final boolean trySerializeLocal;
-    private final TransferDrainer drainer;
     private final Collection<IAutoCredentials> autoCredentials;
-
     private static final long LOAD_REFRESH_INTERVAL_MS = 5000L;
 
     public WorkerState(Map<String, Object> conf, IContext mqContext, String 
topologyId, String assignmentId, int port, String workerId,
@@ -282,12 +277,7 @@ public class WorkerState {
                        Collection<IAutoCredentials> autoCredentials)
         throws IOException, InvalidTopologyException {
         this.autoCredentials = autoCredentials;
-        this.executors = new HashSet<>(readWorkerExecutors(stormClusterState, 
topologyId, assignmentId, port));
-        this.transferQueue = new DisruptorQueue("worker-transfer-queue",
-            
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE)),
-            (long) 
topologyConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS),
-            
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE)),
-            (long) 
topologyConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS));
+        this.localExecutors = new 
HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, 
port));
 
         this.conf = conf;
         this.mqContext = (null != mqContext) ? mqContext : 
TransportFactory.makeContext(topologyConf);
@@ -301,17 +291,16 @@ public class WorkerState {
         this.isWorkerActive = new AtomicBoolean(false);
         this.isTopologyActive = new AtomicBoolean(false);
         this.stormComponentToDebug = new AtomicReference<>();
-        this.executorReceiveQueueMap = mkReceiveQueueMap(topologyConf, 
executors);
+        this.executorReceiveQueueMap = mkReceiveQueueMap(topologyConf, 
localExecutors);
         this.shortExecutorReceiveQueueMap = new HashMap<>();
-        this.taskIds = new ArrayList<>();
+        this.localTaskIds = new ArrayList<>();
         this.blobToLastKnownVersion = new ConcurrentHashMap<>();
-        for (Map.Entry<List<Long>, DisruptorQueue> entry : 
executorReceiveQueueMap.entrySet()) {
+        for (Map.Entry<List<Long>, JCQueue> entry : 
executorReceiveQueueMap.entrySet()) {
             
this.shortExecutorReceiveQueueMap.put(entry.getKey().get(0).intValue(), 
entry.getValue());
-            this.taskIds.addAll(StormCommon.executorIdToTasks(entry.getKey()));
+            
this.localTaskIds.addAll(StormCommon.executorIdToTasks(entry.getKey()));
         }
-        Collections.sort(taskIds);
+        Collections.sort(localTaskIds);
         this.topologyConf = topologyConf;
-        this.backpressureZnodeTimeoutMs = 
ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_TIMEOUT_SECS)) * 
1000;
         this.topology = ConfigUtils.readSupervisorTopology(conf, topologyId, 
AdvancedFSOps.make(conf));
         this.systemTopology = StormCommon.systemTopology(topologyConf, 
topology);
         this.taskToComponent = StormCommon.stormTaskInfo(topology, 
topologyConf);
@@ -330,7 +319,7 @@ public class WorkerState {
         this.cachedNodeToPortSocket = new AtomicReference<>(new HashMap<>());
         this.cachedTaskToNodePort = new AtomicReference<>(new HashMap<>());
         this.taskToShortExecutor = new HashMap<>();
-        for (List<Long> executor : this.executors) {
+        for (List<Long> executor : this.localExecutors) {
             for (Integer task : StormCommon.executorIdToTasks(executor)) {
                 taskToShortExecutor.put(task, executor.get(0).intValue());
             }
@@ -347,8 +336,9 @@ public class WorkerState {
         if (trySerializeLocal) {
             LOG.warn("WILL TRY TO SERIALIZE ALL TUPLES (Turn off {} for 
production", Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE);
         }
-        this.drainer = new TransferDrainer();
-
+        int maxTaskId = getMaxTaskId(componentToSortedTasks);
+        this.workerTransfer = new WorkerTransfer(this, topologyConf, 
maxTaskId);
+        this.bpTracker = new BackPressureTracker(workerId, localTaskIds);
     }
 
     public void refreshConnections() {
@@ -359,6 +349,10 @@ public class WorkerState {
         }
     }
 
+    public SmartThread makeTransferThread() {
+        return workerTransfer.makeTransferThread();
+    }
+
     public void refreshConnections(Runnable callback) throws Exception {
         Integer version = stormClusterState.assignmentVersion(topologyId, 
callback);
         version = (null == version) ? 0 : version;
@@ -386,7 +380,7 @@ public class WorkerState {
                 Integer task = taskToNodePortEntry.getKey();
                 if (outboundTasks.contains(task)) {
                     newTaskToNodePort.put(task, 
taskToNodePortEntry.getValue());
-                    if (!taskIds.contains(task)) {
+                    if (!localTaskIds.contains(task)) {
                         neededConnections.add(taskToNodePortEntry.getValue());
                     }
                 }
@@ -405,7 +399,8 @@ public class WorkerState {
                     mqContext.connect(
                         topologyId,
                         assignment.get_node_host().get(nodeInfo.get_node()),   
 // Host
-                        nodeInfo.get_port().iterator().next().intValue()));    
 // Port
+                        nodeInfo.get_port().iterator().next().intValue(),      
 // Port
+                        workerTransfer.getRemoteBackPressureStatus()));
             }
             return next;
         });
@@ -439,8 +434,8 @@ public class WorkerState {
         StormBase base = stormClusterState.stormBase(topologyId, callback);
         isTopologyActive.set(
             (null != base) &&
-            (base.get_status() == TopologyStatus.ACTIVE) &&
-            (isWorkerActive.get()));
+                (base.get_status() == TopologyStatus.ACTIVE) &&
+                (isWorkerActive.get()));
         if (null != base) {
             Map<String, DebugOptions> debugOptionsMap = new 
HashMap<>(base.get_component_debug());
             for (DebugOptions debugOptions : debugOptionsMap.values()) {
@@ -456,24 +451,18 @@ public class WorkerState {
         }
     }
 
-    public void refreshThrottle() {
-        boolean backpressure = 
stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, 
this::refreshThrottle);
-        this.throttleOn.set(backpressure);
-    }
-
-    private static double getQueueLoad(DisruptorQueue q) {
-        DisruptorQueue.QueueMetrics qMetrics = q.getMetrics();
+    private static double getQueueLoad(JCQueue q) {
+        JCQueue.QueueMetrics qMetrics = q.getMetrics();
         return ((double) qMetrics.population()) / qMetrics.capacity();
     }
 
     public void refreshLoad(List<IRunningExecutor> execs) {
-        Set<Integer> remoteTasks = Sets.difference(new 
HashSet<>(outboundTasks), new HashSet<>(taskIds));
+        Set<Integer> remoteTasks = Sets.difference(new 
HashSet<>(outboundTasks), new HashSet<>(localTaskIds));
         Long now = System.currentTimeMillis();
         Map<Integer, Double> localLoad = new HashMap<>();
-        for (IRunningExecutor exec: execs) {
+        for (IRunningExecutor exec : execs) {
             double receiveLoad = getQueueLoad(exec.getReceiveQueue());
-            double sendLoad = getQueueLoad(exec.getSendQueue());
-            localLoad.put(exec.getExecutorId().get(0).intValue(), 
Math.max(receiveLoad, sendLoad));
+            localLoad.put(exec.getExecutorId().get(0).intValue(), receiveLoad);
         }
 
         Map<Integer, Load> remoteLoad = new HashMap<>();
@@ -481,12 +470,23 @@ public class WorkerState {
         loadMapping.setLocal(localLoad);
         loadMapping.setRemote(remoteLoad);
 
-        if (now > nextUpdate.get()) {
+        if (now > nextLoadUpdate.get()) {
             receiver.sendLoadMetrics(localLoad);
-            nextUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
+            nextLoadUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
         }
     }
 
+    // checks if the tasks which had back pressure are now free again. if so, 
sends an update to other workers
+    public void refreshBackPressureStatus() {
+        LOG.debug("Checking for change in Backpressure status on worker's 
tasks");
+        boolean bpSituationChanged = bpTracker.refreshBpTaskList();
+        if (bpSituationChanged) {
+            BackPressureStatus bpStatus = bpTracker.getCurrStatus();
+            receiver.sendBackPressureStatus(bpStatus);
+        }
+    }
+
+
     /**
      * we will wait all connections to be ready and then activate the 
spout/bolt
      * when the worker bootup.
@@ -494,8 +494,8 @@ public class WorkerState {
     public void activateWorkerWhenAllConnectionsReady() {
         int delaySecs = 0;
         int recurSecs = 1;
-        refreshActiveTimer.schedule(delaySecs, new Runnable() {
-            @Override public void run() {
+        refreshActiveTimer.schedule(delaySecs,
+            () -> {
                 if (areAllConnectionsReady()) {
                     LOG.info("All connections are ready for worker {}:{} with 
id {}", assignmentId, port, workerId);
                     isWorkerActive.set(Boolean.TRUE);
@@ -503,90 +503,83 @@ public class WorkerState {
                     refreshActiveTimer.schedule(recurSecs, () -> 
activateWorkerWhenAllConnectionsReady(), false, 0);
                 }
             }
-        });
+        );
     }
 
     public void registerCallbacks() {
         LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, 
port);
         receiver.registerRecv(new DeserializingConnectionCallback(topologyConf,
             getWorkerTopologyContext(),
-            this::transferLocal));
+            this::transferLocalBatch));
+        // Send curr BackPressure status to new clients
+        receiver.registerNewConnectionResponse(
+            () -> {
+                BackPressureStatus bpStatus = bpTracker.getCurrStatus();
+                LOG.info("Sending BackPressure status to new client. BPStatus: 
{}", bpStatus);
+                return bpStatus;
+            }
+        );
     }
 
-    public void transferLocal(List<AddressedTuple> tupleBatch) {
-        Map<Integer, List<AddressedTuple>> grouped = new HashMap<>();
-        for (AddressedTuple tuple : tupleBatch) {
-            Integer executor = taskToShortExecutor.get(tuple.dest);
-            if (null == executor) {
-                LOG.warn("Received invalid messages for unknown tasks. 
Dropping... ");
-                continue;
-            }
-            List<AddressedTuple> current = grouped.get(executor);
-            if (null == current) {
-                current = new ArrayList<>();
-                grouped.put(executor, current);
-            }
-            current.add(tuple);
-        }
+    /* Not a Blocking call. If cannot emit, will add 'tuple' to pendingEmits 
and return 'false'. 'pendingEmits' can be null */
+    public boolean tryTransferRemote(AddressedTuple tuple, 
Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {
+        return workerTransfer.tryTransferRemote(tuple, pendingEmits, 
serializer);
+    }
 
-        for (Map.Entry<Integer, List<AddressedTuple>> entry : 
grouped.entrySet()) {
-            DisruptorQueue queue = 
shortExecutorReceiveQueueMap.get(entry.getKey());
-            if (null != queue) {
-                queue.publish(entry.getValue());
-            } else {
-                LOG.warn("Received invalid messages for unknown tasks. 
Dropping... ");
-            }
-        }
+    public void flushRemotes() throws InterruptedException {
+        workerTransfer.flushRemotes();
     }
 
-    public void transfer(KryoTupleSerializer serializer, List<AddressedTuple> 
tupleBatch) {
-        if (trySerializeLocal) {
-            assertCanSerialize(serializer, tupleBatch);
-        }
-        List<AddressedTuple> local = new ArrayList<>();
-        Map<Integer, List<TaskMessage>> remoteMap = new HashMap<>();
-        for (AddressedTuple addressedTuple : tupleBatch) {
-            int destTask = addressedTuple.getDest();
-            if (taskIds.contains(destTask)) {
-                // Local task
-                local.add(addressedTuple);
-            } else {
-                // Using java objects directly to avoid performance issues in 
java code
-                if (! remoteMap.containsKey(destTask)) {
-                    remoteMap.put(destTask, new ArrayList<>());
+    public boolean tryFlushRemotes() {
+        return workerTransfer.tryFlushRemotes();
+    }
+
+    // Receives msgs from remote workers and feeds them to local executors. If 
any receiving local executor is under Back Pressure,
+    // informs other workers about back pressure situation. Runs in the 
NettyWorker thread.
+    private void transferLocalBatch(ArrayList<AddressedTuple> tupleBatch) {
+        int lastOverflowCount = 0; // overflowQ size at the time the last 
BPStatus was sent
+
+        for (int i = 0; i < tupleBatch.size(); i++) {
+            AddressedTuple tuple = tupleBatch.get(i);
+            JCQueue queue = shortExecutorReceiveQueueMap.get(tuple.dest);
+
+            // 1- try adding to main queue if its overflow is not empty
+            if (queue.isEmptyOverflow()) {
+                if (queue.tryPublish(tuple)) {
+                    continue;
                 }
-                remoteMap.get(destTask).add(new TaskMessage(destTask, 
serializer.serialize(addressedTuple.getTuple())));
             }
-        }
 
-        if (!local.isEmpty()) {
-            transferLocal(local);
-        }
-        if (!remoteMap.isEmpty()) {
-            transferQueue.publish(remoteMap);
-        }
-    }
+            // 2- BP detected (i.e MainQ is full). So try adding to overflow
+            int currOverflowCount = queue.getOverflowCount();
+            if (bpTracker.recordBackPressure(tuple.dest, queue)) {
+                receiver.sendBackPressureStatus(bpTracker.getCurrStatus());
+                lastOverflowCount = currOverflowCount;
+            } else {
 
-    // TODO: consider having a max batch size besides what disruptor does 
automagically to prevent latency issues
-    public void sendTuplesToRemoteWorker(HashMap<Integer, 
ArrayList<TaskMessage>> packets, long seqId, boolean batchEnd) {
-        drainer.add(packets);
-        if (batchEnd) {
-            ReentrantReadWriteLock.ReadLock readLock = 
endpointSocketLock.readLock();
-            try {
-                readLock.lock();
-                drainer.send(cachedTaskToNodePort.get(), 
cachedNodeToPortSocket.get());
-            } finally {
-                readLock.unlock();
+                if (currOverflowCount - lastOverflowCount > 10000) {
+                    // resend BP status, in case prev notification was missed 
or reordered
+                    BackPressureStatus bpStatus = bpTracker.getCurrStatus();
+                    receiver.sendBackPressureStatus(bpStatus);
+                    lastOverflowCount = currOverflowCount;
+                    LOG.debug("Re-sent BackPressure Status. OverflowCount = 
{}, BP Status ID = {}. ", currOverflowCount, bpStatus.id);
+                }
+            }
+            if (!queue.tryPublishToOverflow(tuple)) {
+                dropMessage(tuple, queue);
             }
-            drainer.clear();
         }
     }
 
+    private void dropMessage(AddressedTuple tuple, JCQueue queue) {
+        ++dropCount;
+        queue.recordMsgDrop();
+        LOG.warn("Dropping message as overflow threshold has reached for Q = 
{}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}", 
queue.getName(), queue.getOverflowCount(), dropCount, tuple);
+    }
 
-    private void assertCanSerialize(KryoTupleSerializer serializer, 
List<AddressedTuple> tuples) {
-        // Check that all of the tuples can be serialized by serializing them
-        for (AddressedTuple addressedTuple : tuples) {
-            serializer.serialize(addressedTuple.getTuple());
+    public void checkSerialize(KryoTupleSerializer serializer, AddressedTuple 
tuple) {
+        if (trySerializeLocal) {
+            serializer.serialize(tuple.getTuple());
         }
     }
 
@@ -595,7 +588,7 @@ public class WorkerState {
             String codeDir = 
ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf,
 topologyId));
             String pidDir = ConfigUtils.workerPidsRoot(conf, topologyId);
             return new WorkerTopologyContext(systemTopology, topologyConf, 
taskToComponent, componentToSortedTasks,
-                componentToStreamToFields, topologyId, codeDir, pidDir, port, 
taskIds,
+                componentToStreamToFields, topologyId, codeDir, pidDir, port, 
localTaskIds,
                 defaultSharedResources,
                 userSharedResources, cachedTaskToNodePort, assignmentId);
         } catch (IOException e) {
@@ -648,7 +641,7 @@ public class WorkerState {
     }
 
     private List<List<Long>> readWorkerExecutors(IStormClusterState 
stormClusterState, String topologyId, String assignmentId,
-        int port) {
+                                                 int port) {
         LOG.info("Reading assignments");
         List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>();
         executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
@@ -663,23 +656,31 @@ public class WorkerState {
         return executorsAssignedToThisWorker;
     }
 
-    private Map<List<Long>, DisruptorQueue> mkReceiveQueueMap(Map<String, 
Object> topologyConf, Set<List<Long>> executors) {
-        Map<List<Long>, DisruptorQueue> receiveQueueMap = new HashMap<>();
+    private Map<List<Long>, JCQueue> mkReceiveQueueMap(Map<String, Object> 
topologyConf, Set<List<Long>> executors) {
+        Integer recvQueueSize = 
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE));
+        Integer recvBatchSize = 
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
+        Integer overflowLimit = 
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_OVERFLOW_LIMIT));
+
+        if (recvBatchSize > recvQueueSize / 2) {
+            throw new 
IllegalArgumentException(Config.TOPOLOGY_PRODUCER_BATCH_SIZE + ":" + 
recvBatchSize +
+                " is greater than half of " + 
Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE + ":" + recvQueueSize);
+        }
+
+        IWaitStrategy backPressureWaitStrategy = 
IWaitStrategy.createBackPressureWaitStrategy(topologyConf);
+        Map<List<Long>, JCQueue> receiveQueueMap = new HashMap<>();
         for (List<Long> executor : executors) {
-            receiveQueueMap.put(executor, new DisruptorQueue("receive-queue",
-                
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE)),
-                (long) 
topologyConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS),
-                
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE)),
-                (long) 
topologyConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS)));
+            receiveQueueMap.put(executor, new JCQueue("receive-queue" + 
executor.toString(),
+                recvQueueSize, overflowLimit, recvBatchSize, 
backPressureWaitStrategy));
+
         }
         return receiveQueueMap;
     }
-    
+
     private Map<String, Object> makeDefaultResources() {
         int threadPoolSize = 
ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE));
         return ImmutableMap.of(WorkerTopologyContext.SHARED_EXECUTOR, 
Executors.newFixedThreadPool(threadPoolSize));
     }
-    
+
     private Map<String, Object> makeUserResources() {
         /* TODO: need to invoke a hook provided by the topology, giving it a 
chance to create user resources.
         * this would be part of the initialization hook
@@ -697,13 +698,12 @@ public class WorkerState {
     }
 
     /**
-     *
      * @return seq of task ids that receive messages from this worker
      */
     private Set<Integer> workerOutboundTasks() {
         WorkerTopologyContext context = getWorkerTopologyContext();
         Set<String> components = new HashSet<>();
-        for (Integer taskId : taskIds) {
+        for (Integer taskId : localTaskIds) {
             for (Map<String, Grouping> value : 
context.getTargets(context.getComponentId(taskId)).values()) {
                 components.addAll(value.keySet());
             }
@@ -719,7 +719,28 @@ public class WorkerState {
         return outboundTasks;
     }
 
+    public void haltWorkerTransfer() {
+        workerTransfer.haltTransferThd();
+    }
+
+    private static int getMaxTaskId(Map<String, List<Integer>> 
componentToSortedTasks) {
+        int maxTaskId = -1;
+        for (List<Integer> integers : componentToSortedTasks.values()) {
+            if (!integers.isEmpty()) {
+                int tempMax = integers.stream().max(Integer::compareTo).get();
+                if (tempMax > maxTaskId) {
+                    maxTaskId = tempMax;
+                }
+            }
+        }
+        return maxTaskId;
+    }
+
+    public JCQueue getTransferQueue() {
+        return workerTransfer.getTransferQueue();
+    }
+
     public interface ILocalTransferCallback {
-        void transfer(List<AddressedTuple> tupleBatch);
+        void transfer(ArrayList<AddressedTuple> tupleBatch);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java 
b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
new file mode 100644
index 0000000..e57123c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.daemon.worker;
+
+import org.apache.storm.Config;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.serialization.ITupleSerializer;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.utils.JCQueue;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.TransferDrainer;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.Utils.SmartThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+// Transfers messages destined to other workers
+class WorkerTransfer implements JCQueue.Consumer {
+    static final Logger LOG = LoggerFactory.getLogger(WorkerTransfer.class);
+
+    private final TransferDrainer drainer;
+    private WorkerState workerState;
+
+    private IWaitStrategy backPressureWaitStrategy;
+
+    private JCQueue transferQueue; // [remoteTaskId] -> JCQueue. Some entries 
maybe null (if no emits to those tasksIds from this worker)
+
+    private AtomicBoolean[] remoteBackPressureStatus; // [[remoteTaskId] -> 
true/false : indicates if remote task is under BP.
+
+    public WorkerTransfer(WorkerState workerState, Map<String, Object> 
topologyConf, int maxTaskIdInTopo) {
+        this.workerState = workerState;
+        this.backPressureWaitStrategy = 
IWaitStrategy.createBackPressureWaitStrategy(topologyConf);
+        this.drainer = new TransferDrainer();
+        this.remoteBackPressureStatus = new AtomicBoolean[maxTaskIdInTopo+1];
+        for (int i = 0; i < remoteBackPressureStatus.length; i++) {
+            remoteBackPressureStatus[i] = new AtomicBoolean(false);
+        }
+
+        Integer xferQueueSz = 
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE));
+        Integer xferBatchSz = 
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BATCH_SIZE));
+        if (xferBatchSz > xferQueueSz / 2) {
+            throw new 
IllegalArgumentException(Config.TOPOLOGY_TRANSFER_BATCH_SIZE + ":" + 
xferBatchSz + " must be no more than half of "
+                + Config.TOPOLOGY_TRANSFER_BUFFER_SIZE + ":" + xferQueueSz);
+        }
+
+        this.transferQueue = new JCQueue("worker-transfer-queue", xferQueueSz, 
0, xferBatchSz, backPressureWaitStrategy);
+    }
+
+    public JCQueue getTransferQueue() {
+        return transferQueue;
+    }
+
+    AtomicBoolean[] getRemoteBackPressureStatus() {
+        return remoteBackPressureStatus;
+    }
+
+    public SmartThread makeTransferThread() {
+        return Utils.asyncLoop(() -> {
+            if (transferQueue.consume(this) == 0) {
+                return 1L;
+            }
+            return 0L;
+        });
+    }
+
+    @Override
+    public void accept(Object tuple) {
+        if (tuple == JCQueue.INTERRUPT) {
+            throw new RuntimeException(new InterruptedException("Worker 
Transfer Thread interrupted"));
+        }
+        TaskMessage tm = (TaskMessage) tuple;
+        drainer.add(tm);
+    }
+
+    @Override
+    public void flush() throws InterruptedException {
+        ReentrantReadWriteLock.ReadLock readLock = 
workerState.endpointSocketLock.readLock();
+        try {
+            readLock.lock();
+            drainer.send(workerState.cachedTaskToNodePort.get(), 
workerState.cachedNodeToPortSocket.get());
+        } finally {
+            readLock.unlock();
+        }
+        drainer.clear();
+    }
+
+    /* Not a Blocking call. If cannot emit, will add 'tuple' to 'pendingEmits' 
and return 'false'. 'pendingEmits' can be null */
+    public boolean tryTransferRemote(AddressedTuple addressedTuple, 
Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {
+        if (pendingEmits != null  &&  !pendingEmits.isEmpty()) {
+            pendingEmits.add(addressedTuple);
+            return false;
+        }
+
+        if (!remoteBackPressureStatus[addressedTuple.dest].get()) {
+            TaskMessage tm = new TaskMessage(addressedTuple.getDest(), 
serializer.serialize(addressedTuple.getTuple()));
+            if (transferQueue.tryPublish(tm)) {
+                return true;
+            }
+        } else {
+            LOG.debug("Noticed Back Pressure in remote task {}", 
addressedTuple.dest);
+        }
+        if (pendingEmits != null) {
+            pendingEmits.add(addressedTuple);
+        }
+        return false;
+    }
+
+    public void flushRemotes() throws InterruptedException {
+        transferQueue.flush();
+    }
+
+    public boolean tryFlushRemotes() {
+        return transferQueue.tryFlush();
+    }
+
+
+    public void haltTransferThd() {
+        transferQueue.haltWithInterrupt();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java 
b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index 6b87aed..267fe74 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -6,36 +6,42 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.executor;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.dsl.ProducerType;
+
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.UnknownHostException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Callable;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+
+
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
-import org.apache.storm.StormTimer;
 import org.apache.storm.cluster.ClusterStateContext;
 import org.apache.storm.cluster.ClusterUtils;
 import org.apache.storm.cluster.DaemonType;
@@ -64,28 +70,24 @@ import org.apache.storm.stats.BoltExecutorStats;
 import org.apache.storm.stats.CommonStats;
 import org.apache.storm.stats.SpoutExecutorStats;
 import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.StormTimer;
 import org.apache.storm.task.WorkerTopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.JCQueue;
 import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.DisruptorBackpressureCallback;
-import org.apache.storm.utils.DisruptorQueue;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
-import org.apache.storm.utils.WorkerBackpressureThread;
+import org.jctools.queues.MpscChunkedArrayQueue;
 import org.json.simple.JSONValue;
 import org.json.simple.parser.ParseException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.Callable;
-import java.util.stream.Collectors;
-
-public abstract class Executor implements Callable, EventHandler<Object> {
+public abstract class Executor implements Callable, JCQueue.Consumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(Executor.class);
 
@@ -104,31 +106,31 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
     protected final Runnable suicideFn;
     protected final IStormClusterState stormClusterState;
     protected final Map<Integer, String> taskToComponent;
-    protected CommonStats stats;
     protected final Map<Integer, Map<Integer, Map<String, IMetric>>> 
intervalToTaskToMetricToRegistry;
     protected final Map<String, Map<String, LoadAwareCustomStreamGrouping>> 
streamToComponentToGrouper;
     protected final List<LoadAwareCustomStreamGrouping> groupers;
     protected final ReportErrorAndDie reportErrorDie;
-    protected final Callable<Boolean> sampler;
-    protected ExecutorTransfer executorTransfer;
+    protected final BooleanSupplier sampler;
     protected final String type;
-    protected final AtomicBoolean throttleOn;
-
     protected final IReportError reportError;
     protected final Random rand;
-    protected final DisruptorQueue sendQueue;
-    protected final DisruptorQueue receiveQueue;
-    protected Map<Integer, Task> idToTask;
+    protected final JCQueue receiveQueue;
     protected final Map<String, String> credentials;
     protected final Boolean isDebug;
     protected final Boolean hasEventLoggers;
-    protected String hostname;
-
+    protected final boolean ackingEnabled;
     protected final ErrorReportingMetrics errorReportingMetrics;
+    protected final MpscChunkedArrayQueue<AddressedTuple> pendingEmits = new 
MpscChunkedArrayQueue<>(1024);
+    private final AddressedTuple flushTuple;
+    protected ExecutorTransfer executorTransfer;
+    protected ArrayList<Task> idToTask;
+    protected int idToTaskBase;
+    protected String hostname;
 
-    protected Executor(WorkerState workerData, List<Long> executorId, 
Map<String, String> credentials) {
+    protected Executor(WorkerState workerData, List<Long> executorId, 
Map<String, String> credentials, String type) {
         this.workerData = workerData;
         this.executorId = executorId;
+        this.type = type;
         this.workerTopologyContext = workerData.getWorkerTopologyContext();
         this.taskIds = StormCommon.executorIdToTasks(executorId);
         this.componentId = 
workerTopologyContext.getComponentId(taskIds.get(0));
@@ -141,8 +143,7 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
         this.stormActive = workerData.getIsTopologyActive();
         this.stormComponentDebug = workerData.getStormComponentToDebug();
 
-        this.sendQueue = mkExecutorBatchQueue(topoConf, executorId);
-        this.executorTransfer = new ExecutorTransfer(workerData, sendQueue, 
topoConf);
+        this.executorTransfer = new ExecutorTransfer(workerData, topoConf);
 
         this.suicideFn = workerData.getSuicideCallback();
         try {
@@ -152,19 +153,6 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
             throw Utils.wrapInRuntime(e);
         }
 
-        StormTopology topology = workerTopologyContext.getRawTopology();
-        Map<String, SpoutSpec> spouts = topology.get_spouts();
-        Map<String, Bolt> bolts = topology.get_bolts();
-        if (spouts.containsKey(componentId)) {
-            this.type = StatsUtil.SPOUT;
-            this.stats = new 
SpoutExecutorStats(ConfigUtils.samplingRate(topoConf),ObjectReader.getInt(topoConf.get(Config.NUM_STAT_BUCKETS)));
-        } else if (bolts.containsKey(componentId)) {
-            this.type = StatsUtil.BOLT;
-            this.stats = new 
BoltExecutorStats(ConfigUtils.samplingRate(topoConf),ObjectReader.getInt(topoConf.get(Config.NUM_STAT_BUCKETS)));
-        } else {
-            throw new RuntimeException("Could not find " + componentId + " in 
" + topology);
-        }
-
         this.intervalToTaskToMetricToRegistry = new HashMap<>();
         this.taskToComponent = workerData.getTaskToComponent();
         this.streamToComponentToGrouper = 
outboundComponents(workerTopologyContext, componentId, topoConf);
@@ -178,19 +166,19 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
         this.reportError = new ReportError(topoConf, stormClusterState, 
stormId, componentId, workerTopologyContext);
         this.reportErrorDie = new ReportErrorAndDie(reportError, suicideFn);
         this.sampler = ConfigUtils.mkStatsSampler(topoConf);
-        this.throttleOn = workerData.getThrottleOn();
         this.isDebug = 
ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
         this.rand = new Random(Utils.secureRandomLong());
         this.credentials = credentials;
         this.hasEventLoggers = StormCommon.hasEventLoggers(topoConf);
+        this.ackingEnabled = StormCommon.hasAckers(topoConf);
 
         try {
             this.hostname = Utils.hostname();
         } catch (UnknownHostException ignored) {
             this.hostname = "";
         }
-
         this.errorReportingMetrics = new ErrorReportingMetrics();
+        flushTuple = AddressedTuple.createFlushTuple(workerTopologyContext);
     }
 
     public static Executor mkExecutor(WorkerState workerState, List<Long> 
executorId, Map<String, String> credentials) {
@@ -203,25 +191,24 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
         String type = getExecutorType(workerTopologyContext, componentId);
         if (StatsUtil.SPOUT.equals(type)) {
             executor = new SpoutExecutor(workerState, executorId, credentials);
-            executor.stats = new 
SpoutExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()),ObjectReader.getInt(executor.getStormConf().get(Config.NUM_STAT_BUCKETS)));
         } else {
             executor = new BoltExecutor(workerState, executorId, credentials);
-            executor.stats = new 
BoltExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()),ObjectReader.getInt(executor.getStormConf().get(Config.NUM_STAT_BUCKETS)));
         }
 
+        int minId = Integer.MAX_VALUE;
         Map<Integer, Task> idToTask = new HashMap<>();
         for (Integer taskId : taskIds) {
+            minId = Math.min(minId, taskId);
             try {
                 Task task = new Task(executor, taskId);
-                executor.sendUnanchored(
-                        task, StormCommon.SYSTEM_STREAM_ID, new 
Values("startup"), executor.getExecutorTransfer());
                 idToTask.put(taskId, task);
             } catch (IOException ex) {
                 throw Utils.wrapInRuntime(ex);
             }
         }
 
-        executor.idToTask = idToTask;
+        executor.idToTaskBase = minId;
+        executor.idToTask = Utils.convertToArray(idToTask, minId);
         return executor;
     }
 
@@ -238,36 +225,58 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
         }
     }
 
+    private static List<Object> All_CONFIGS() {
+        List<Object> ret = new ArrayList<Object>();
+        Config config = new Config();
+        Class<?> ConfigClass = config.getClass();
+        Field[] fields = ConfigClass.getFields();
+        for (int i = 0; i < fields.length; i++) {
+            try {
+                Object obj = fields[i].get(null);
+                ret.add(obj);
+            } catch (IllegalArgumentException e) {
+                LOG.error(e.getMessage(), e);
+            } catch (IllegalAccessException e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
+        return ret;
+    }
+
+    public Queue<AddressedTuple> getPendingEmits() {
+        return pendingEmits;
+    }
+
     /**
      * separated from mkExecutor in order to replace executor transfer in 
executor data for testing
      */
     public ExecutorShutdown execute() throws Exception {
         LOG.info("Loading executor tasks " + componentId + ":" + executorId);
 
-        registerBackpressure();
-        Utils.SmartThread systemThreads =
-                Utils.asyncLoop(executorTransfer, executorTransfer.getName(), 
reportErrorDie);
-
         String handlerName = componentId + "-executor" + executorId;
-        Utils.SmartThread handlers =
+        Utils.SmartThread handler =
                 Utils.asyncLoop(this, false, reportErrorDie, 
Thread.NORM_PRIORITY, true, true, handlerName);
 
         LOG.info("Finished loading executor " + componentId + ":" + 
executorId);
-        return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, 
handlers), idToTask, receiveQueue, sendQueue);
+        return new ExecutorShutdown(this, Lists.newArrayList(handler), 
idToTask, receiveQueue);
     }
 
     public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws 
Exception;
 
-    @SuppressWarnings("unchecked")
     @Override
-    public void onEvent(Object event, long seq, boolean endOfBatch) throws 
Exception {
-        ArrayList<AddressedTuple> addressedTuples = 
(ArrayList<AddressedTuple>) event;
-        for (AddressedTuple addressedTuple : addressedTuples) {
-            TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
-            int taskId = addressedTuple.getDest();
-            if (isDebug) {
-                LOG.info("Processing received message FOR {} TUPLE: {}", 
taskId, tuple);
-            }
+    public void accept(Object event) {
+        if (event == JCQueue.INTERRUPT) {
+            throw new RuntimeException(new InterruptedException("JCQ 
processing interrupted"));
+        }
+        AddressedTuple addressedTuple = (AddressedTuple) event;
+        int taskId = addressedTuple.getDest();
+
+        TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
+        if (isDebug) {
+            LOG.info("Processing received message FOR {} TUPLE: {}", taskId, 
tuple);
+        }
+
+        try {
             if (taskId != AddressedTuple.BROADCAST_DEST) {
                 tupleActionFn(taskId, tuple);
             } else {
@@ -275,13 +284,20 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
                     tupleActionFn(t, tuple);
                 }
             }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
     }
 
-    public void metricsTick(Task taskData, TupleImpl tuple) {
+    @Override
+    public void flush() {
+        // NO-OP
+    }
+
+    public void metricsTick(Task task, TupleImpl tuple) {
         try {
             Integer interval = tuple.getInteger(0);
-            int taskId = taskData.getTaskId();
+            int taskId = task.getTaskId();
             Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = 
intervalToTaskToMetricToRegistry.get(interval);
             Map<String, IMetric> nameToRegistry = null;
             if (taskToMetricToRegistry != null) {
@@ -289,8 +305,8 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
             }
             if (nameToRegistry != null) {
                 IMetricsConsumer.TaskInfo taskInfo = new 
IMetricsConsumer.TaskInfo(
-                        hostname, workerTopologyContext.getThisWorkerPort(),
-                        componentId, taskId, Time.currentTimeSecs(), interval);
+                    hostname, workerTopologyContext.getThisWorkerPort(),
+                    componentId, taskId, Time.currentTimeSecs(), interval);
                 List<IMetricsConsumer.DataPoint> dataPoints = new 
ArrayList<>();
                 for (Map.Entry<String, IMetric> entry : 
nameToRegistry.entrySet()) {
                     IMetric metric = entry.getValue();
@@ -301,8 +317,9 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
                     }
                 }
                 if (!dataPoints.isEmpty()) {
-                    sendUnanchored(taskData, Constants.METRICS_STREAM_ID,
-                            new Values(taskInfo, dataPoints), 
executorTransfer);
+                    task.sendUnanchored(Constants.METRICS_STREAM_ID,
+                        new Values(taskInfo, dataPoints), executorTransfer, 
pendingEmits);
+                    executorTransfer.flush();
                 }
             }
         } catch (Exception e) {
@@ -313,107 +330,74 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
     protected void setupMetrics() {
         for (final Integer interval : 
intervalToTaskToMetricToRegistry.keySet()) {
             StormTimer timerTask = workerData.getUserTimer();
-            timerTask.scheduleRecurring(interval, interval, new Runnable() {
-                @Override
-                public void run() {
-                    TupleImpl tuple = new TupleImpl(workerTopologyContext, new 
Values(interval),
-                            (int) Constants.SYSTEM_TASK_ID, 
Constants.METRICS_TICK_STREAM_ID);
-                    List<AddressedTuple> metricsTickTuple =
-                            Lists.newArrayList(new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
-                    receiveQueue.publish(metricsTickTuple);
+            timerTask.scheduleRecurring(interval, interval,
+                () -> {
+                    TupleImpl tuple = new TupleImpl(workerTopologyContext, new 
Values(interval), Constants.SYSTEM_COMPONENT_ID,
+                        (int) Constants.SYSTEM_TASK_ID, 
Constants.METRICS_TICK_STREAM_ID);
+                    AddressedTuple metricsTickTuple = new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
+                    try {
+                        receiveQueue.publish(metricsTickTuple);
+                        receiveQueue.flush();  // avoid buffering
+                    } catch (InterruptedException e) {
+                        LOG.warn("Thread interrupted when publishing metrics. 
Setting interrupt flag.");
+                        Thread.currentThread().interrupt();
+                        return;
+                    }
                 }
-            });
-        }
-    }
-
-    public void sendUnanchored(Task task, String stream, List<Object> values, 
ExecutorTransfer transfer) {
-        Tuple tuple = task.getTuple(stream, values);
-        List<Integer> tasks = task.getOutgoingTasks(stream, values);
-        for (Integer t : tasks) {
-            transfer.transfer(t, tuple);
-        }
-    }
-
-    /**
-     * Send sampled data to the eventlogger if the global or component level 
debug flag is set (via nimbus api).
-     */
-    public void sendToEventLogger(Executor executor, Task taskData, List 
values,
-                                  String componentId, Object messageId, Random 
random) {
-        Map<String, DebugOptions> componentDebug = 
executor.getStormComponentDebug().get();
-        DebugOptions debugOptions = componentDebug.get(componentId);
-        if (debugOptions == null) {
-            debugOptions = componentDebug.get(executor.getStormId());
-        }
-        double spct = ((debugOptions != null) && (debugOptions.is_enable())) ? 
debugOptions.get_samplingpct() : 0;
-        if (spct > 0 && (random.nextDouble() * 100) < spct) {
-            sendUnanchored(taskData, StormCommon.EVENTLOGGER_STREAM_ID,
-                    new Values(componentId, messageId, 
System.currentTimeMillis(), values),
-                    executor.getExecutorTransfer());
+            );
         }
     }
 
-    public void reflectNewLoadMapping(LoadMapping loadMapping) {
-        for (LoadAwareCustomStreamGrouping g : groupers) {
-            g.refreshLoad(loadMapping);
-        }
-    }
-
-    private void registerBackpressure() {
-        receiveQueue.registerBackpressureCallback(new 
DisruptorBackpressureCallback() {
-            @Override
-            public void highWaterMark() throws Exception {
-                LOG.debug("executor " + executorId + " is congested, set 
backpressure flag true");
-                
WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
-            }
-
-            @Override
-            public void lowWaterMark() throws Exception {
-                LOG.debug("executor " + executorId + " is not-congested, set 
backpressure flag false");
-                
WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
-            }
-        });
-        
receiveQueue.setHighWaterMark(ObjectReader.getDouble(topoConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
-        
receiveQueue.setLowWaterMark(ObjectReader.getDouble(topoConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
-        
receiveQueue.setEnableBackpressure(ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE),
 false));
-    }
-
     protected void setupTicks(boolean isSpout) {
         final Integer tickTimeSecs = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);
-        boolean enableMessageTimeout = (Boolean) 
topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
         if (tickTimeSecs != null) {
+            boolean enableMessageTimeout = (Boolean) 
topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
             if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) && 
Utils.isSystemId(componentId))
                 || (!enableMessageTimeout && isSpout)) {
                 LOG.info("Timeouts disabled for executor {}:{}", componentId, 
executorId);
             } else {
                 StormTimer timerTask = workerData.getUserTimer();
-                timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs, () -> {
-                    // We should create a new tick tuple for each recurrence 
instead of sharing object
-                    // More detail on 
https://issues.apache.org/jira/browse/STORM-2912
-                    TupleImpl tuple = new TupleImpl(workerTopologyContext, new 
Values(tickTimeSecs),
-                            (int) Constants.SYSTEM_TASK_ID, 
Constants.SYSTEM_TICK_STREAM_ID);
-                    final List<AddressedTuple> tickTuple =
-                            Lists.newArrayList(new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
-                    receiveQueue.publish(tickTuple);
-                });
+                timerTask.scheduleRecurring(tickTimeSecs, tickTimeSecs,
+                    () -> {
+                        TupleImpl tuple = new TupleImpl(workerTopologyContext, 
new Values(tickTimeSecs),
+                            Constants.SYSTEM_COMPONENT_ID, (int) 
Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID);
+                        AddressedTuple tickTuple = new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
+                        try {
+                            receiveQueue.publish(tickTuple);
+                            receiveQueue.flush(); // avoid buffering
+                        } catch (InterruptedException e) {
+                            LOG.warn("Thread interrupted when emitting tick 
tuple. Setting interrupt flag.");
+                            Thread.currentThread().interrupt();
+                            return;
+                        }
+                    }
+                );
             }
         }
     }
 
+    public void reflectNewLoadMapping(LoadMapping loadMapping) {
+        for (LoadAwareCustomStreamGrouping g : groupers) {
+            g.refreshLoad(loadMapping);
+        }
+    }
 
-    private DisruptorQueue mkExecutorBatchQueue(Map<String, Object> topoConf, 
List<Long> executorId) {
-        int sendSize = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE));
-        int waitTimeOutMs = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS));
-        int batchSize = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE));
-        int batchTimeOutMs = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS));
-        return new DisruptorQueue("executor" + executorId + "-send-queue", 
ProducerType.MULTI,
-                sendSize, waitTimeOutMs, batchSize, batchTimeOutMs);
+    // Called by flush-tuple-timer thread
+    public boolean publishFlushTuple() {
+        if (receiveQueue.tryPublishDirect(flushTuple)) {
+            LOG.debug("Published Flush tuple to: {} ", getComponentId());
+            return true;
+        } else {
+            LOG.debug("RecvQ is currently full, will retry publishing Flush 
Tuple later to : {}", getComponentId());
+            return false;
+        }
     }
 
     /**
      * Returns map of stream id to component id to grouper
      */
     private Map<String, Map<String, LoadAwareCustomStreamGrouping>> 
outboundComponents(
-            WorkerTopologyContext workerTopologyContext, String componentId, 
Map<String, Object> topoConf) {
+        WorkerTopologyContext workerTopologyContext, String componentId, 
Map<String, Object> topoConf) {
         Map<String, Map<String, LoadAwareCustomStreamGrouping>> ret = new 
HashMap<>();
 
         Map<String, Map<String, Grouping>> outputGroupings = 
workerTopologyContext.getTargets(componentId);
@@ -427,7 +411,7 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
                 Grouping grouping = cg.getValue();
                 List<Integer> outTasks = 
workerTopologyContext.getComponentTasks(component);
                 LoadAwareCustomStreamGrouping grouper = 
GrouperFactory.mkGrouper(
-                        workerTopologyContext, componentId, streamId, 
outFields, grouping, outTasks, topoConf);
+                    workerTopologyContext, componentId, streamId, outFields, 
grouping, outTasks, topoConf);
                 componentGrouper.put(component, grouper);
             }
             if (componentGrouper.size() > 0) {
@@ -444,6 +428,10 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
         return ret;
     }
 
+    // 
=============================================================================
+    // ============================ getter methods 
=================================
+    // 
=============================================================================
+
     private Map normalizedComponentConf(Map<String, Object> topoConf, 
WorkerTopologyContext topologyContext, String componentId) {
         List<Object> keysToRemove = All_CONFIGS();
         keysToRemove.remove(Config.TOPOLOGY_DEBUG);
@@ -485,10 +473,6 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
         return ret;
     }
 
-    // 
=============================================================================
-    // ============================ getter methods 
=================================
-    // 
=============================================================================
-
     public List<Long> getExecutorId() {
         return executorId;
     }
@@ -505,7 +489,7 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
         return openOrPrepareWasCalled;
     }
 
-    public Map getStormConf() {
+    public Map getTopoConf() {
         return topoConf;
     }
 
@@ -513,13 +497,7 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
         return stormId;
     }
 
-    public CommonStats getStats() {
-        return stats;
-    }
-
-    public AtomicBoolean getThrottleOn() {
-        return throttleOn;
-    }
+    abstract public CommonStats getStats();
 
     public String getType() {
         return type;
@@ -545,26 +523,18 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
         return workerTopologyContext;
     }
 
-    public Callable<Boolean> getSampler() {
-        return sampler;
+    public boolean samplerCheck() {
+        return sampler.getAsBoolean();
     }
 
     public AtomicReference<Map<String, DebugOptions>> getStormComponentDebug() 
{
         return stormComponentDebug;
     }
 
-    public DisruptorQueue getReceiveQueue() {
+    public JCQueue getReceiveQueue() {
         return receiveQueue;
     }
 
-    public boolean getBackpressure() {
-        return receiveQueue.getThrottleOn();
-    }
-
-    public DisruptorQueue getTransferWorkerQueue() {
-        return sendQueue;
-    }
-
     public IStormClusterState getStormClusterState() {
         return stormClusterState;
     }
@@ -590,22 +560,4 @@ public abstract class Executor implements Callable, 
EventHandler<Object> {
         this.executorTransfer = executorTransfer;
     }
 
-    private static List<Object> All_CONFIGS() {
-        List<Object> ret = new ArrayList<Object>();
-        Config config = new Config();
-        Class<?> ConfigClass = config.getClass();
-        Field[] fields = ConfigClass.getFields();
-        for (int i = 0; i < fields.length; i++) {
-            try {
-                Object obj = fields[i].get(null);
-                ret.add(obj);
-            } catch (IllegalArgumentException e) {
-                LOG.error(e.getMessage(), e);
-            } catch (IllegalAccessException e) {
-                LOG.error(e.getMessage(), e);
-            }
-        }
-        return ret;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java 
b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
index c7691e4..996a557 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm.executor;
 
-import com.google.common.collect.Lists;
 import org.apache.storm.Constants;
 import org.apache.storm.daemon.Shutdownable;
 import org.apache.storm.daemon.Task;
@@ -31,30 +30,28 @@ import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.JCQueue;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 public class ExecutorShutdown implements Shutdownable, IRunningExecutor {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ExecutorShutdown.class);
+
     private final Executor executor;
     private final List<Utils.SmartThread> threads;
-    private final Map<Integer, Task> taskDatas;
-    private final DisruptorQueue receiveQueue;
-    private final DisruptorQueue sendQueue;
+    private final ArrayList<Task> taskDatas;
+    private final JCQueue receiveQueue;
 
-    public ExecutorShutdown(Executor executor, List<Utils.SmartThread> 
threads, Map<Integer, Task> taskDatas,
-                            DisruptorQueue receiveQueue, DisruptorQueue 
sendQueue) {
+    public ExecutorShutdown(Executor executor, List<Utils.SmartThread> 
threads, ArrayList<Task> taskDatas, JCQueue recvQueue) {
         this.executor = executor;
         this.threads = threads;
         this.taskDatas = taskDatas;
-        this.receiveQueue = receiveQueue;
-        this.sendQueue = sendQueue;
+        this.receiveQueue = recvQueue;
     }
 
     @Override
@@ -69,30 +66,29 @@ public class ExecutorShutdown implements Shutdownable, 
IRunningExecutor {
 
     @Override
     public void credentialsChanged(Credentials credentials) {
-        TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), 
new Values(credentials), (int) Constants.SYSTEM_TASK_ID,
-                Constants.CREDENTIALS_CHANGED_STREAM_ID);
-        List<AddressedTuple> addressedTuple = Lists.newArrayList(new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
-        executor.getReceiveQueue().publish(addressedTuple);
+        TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), 
new Values(credentials),
+                Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID, 
Constants.CREDENTIALS_CHANGED_STREAM_ID);
+        AddressedTuple addressedTuple = new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
+        try {
+            executor.getReceiveQueue().publish(addressedTuple);
+            executor.getReceiveQueue().flush();
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
     }
 
-    @Override
     public void loadChanged(LoadMapping loadMapping) {
         executor.reflectNewLoadMapping(loadMapping);
     }
 
     @Override
-    public boolean getBackPressureFlag() {
-        return executor.getBackpressure();
-    }
-
-    @Override
-    public DisruptorQueue getReceiveQueue() {
+    public JCQueue getReceiveQueue() {
         return receiveQueue;
     }
 
     @Override
-    public DisruptorQueue getSendQueue() {
-        return sendQueue;
+    public boolean publishFlushTuple() {
+        return executor.publishFlushTuple();
     }
 
     @Override
@@ -100,7 +96,6 @@ public class ExecutorShutdown implements Shutdownable, 
IRunningExecutor {
         try {
             LOG.info("Shutting down executor " + executor.getComponentId() + 
":" + executor.getExecutorId());
             executor.getReceiveQueue().haltWithInterrupt();
-            executor.getTransferWorkerQueue().haltWithInterrupt();
             for (Utils.SmartThread t : threads) {
                 t.interrupt();
             }
@@ -109,7 +104,9 @@ public class ExecutorShutdown implements Shutdownable, 
IRunningExecutor {
                 t.join();
             }
             executor.getStats().cleanupStats();
-            for (Task task : taskDatas.values()) {
+            for (Task task : taskDatas) {
+                if (task==null)
+                    continue;
                 TopologyContext userContext = task.getUserContext();
                 for (ITaskHook hook : userContext.getHooks()) {
                     hook.cleanup();
@@ -117,7 +114,9 @@ public class ExecutorShutdown implements Shutdownable, 
IRunningExecutor {
             }
             executor.getStormClusterState().disconnect();
             if (executor.getOpenOrPrepareWasCalled().get()) {
-                for (Task task : taskDatas.values()) {
+                for (Task task : taskDatas) {
+                    if (task==null)
+                        continue;
                     Object object = task.getTaskObject();
                     if (object instanceof ISpout) {
                         ((ISpout) object).close();

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java 
b/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
index 1e66305..726f1fe 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
@@ -17,72 +17,102 @@
  */
 package org.apache.storm.executor;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.EventHandler;
 import org.apache.storm.Config;
 import org.apache.storm.daemon.worker.WorkerState;
 import org.apache.storm.serialization.KryoTupleSerializer;
 import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.DisruptorQueue;
-import org.apache.storm.utils.MutableObject;
+import org.apache.storm.utils.JCQueue;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReferenceArray;
 
-public class ExecutorTransfer implements EventHandler, Callable {
+// Every executor has an instance of this class
+public class ExecutorTransfer  {
     private static final Logger LOG = 
LoggerFactory.getLogger(ExecutorTransfer.class);
 
     private final WorkerState workerData;
-    private final DisruptorQueue batchTransferQueue;
-    private final Map<String, Object> topoConf;
     private final KryoTupleSerializer serializer;
-    private final MutableObject cachedEmit;
     private final boolean isDebug;
+    private int indexingBase = 0;
+    private ArrayList<JCQueue> localReceiveQueues; // [taskId-indexingBase] => 
queue : List of all recvQs local to this worker
+    private AtomicReferenceArray<JCQueue> queuesToFlush; // 
[taskId-indexingBase] => queue, some entries can be null. : outbound Qs for 
this executor instance
 
-    public ExecutorTransfer(WorkerState workerData, DisruptorQueue 
batchTransferQueue, Map<String, Object> topoConf) {
+
+    public ExecutorTransfer(WorkerState workerData, Map<String, Object> 
topoConf) {
         this.workerData = workerData;
-        this.batchTransferQueue = batchTransferQueue;
-        this.topoConf = topoConf;
         this.serializer = new KryoTupleSerializer(topoConf, 
workerData.getWorkerTopologyContext());
-        this.cachedEmit = new MutableObject(new ArrayList<>());
         this.isDebug = 
ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
     }
 
-    public void transfer(int task, Tuple tuple) {
-        AddressedTuple val = new AddressedTuple(task, tuple);
+    // to be called after all Executor objects in the worker are created and 
before this object is used
+    public void initLocalRecvQueues() {
+        Integer minTaskId = 
workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get();
+        this.localReceiveQueues = Utils.convertToArray( 
workerData.getLocalReceiveQueues(), minTaskId);
+        this.indexingBase = minTaskId;
+        this.queuesToFlush = new 
AtomicReferenceArray<JCQueue>(localReceiveQueues.size());
+    }
+
+    // adds addressedTuple to destination Q if it is not full. else adds to 
pendingEmits (if its not null)
+    public boolean tryTransfer(AddressedTuple addressedTuple, 
Queue<AddressedTuple> pendingEmits) {
         if (isDebug) {
-            LOG.info("TRANSFERRING tuple {}", val);
+            LOG.info("TRANSFERRING tuple {}", addressedTuple);
+        }
+
+        JCQueue localQueue = getLocalQueue(addressedTuple);
+        if (localQueue!=null) {
+            return tryTransferLocal(addressedTuple, localQueue, pendingEmits);
         }
-        batchTransferQueue.publish(val);
+        return workerData.tryTransferRemote(addressedTuple, pendingEmits, 
serializer);
     }
 
-    @VisibleForTesting
-    public DisruptorQueue getBatchTransferQueue() {
-        return this.batchTransferQueue;
+
+    // flushes local and remote messages
+    public void flush() throws InterruptedException {
+        flushLocal();
+        workerData.flushRemotes();
     }
 
-    @Override
-    public Object call() throws Exception {
-        batchTransferQueue.consumeBatchWhenAvailable(this);
-        return 0L;
+    private void flushLocal() throws InterruptedException {
+        for (int i = 0; i < queuesToFlush.length(); i++) {
+            JCQueue q = queuesToFlush.get(i);
+            if(q!=null) {
+                q.flush();
+                queuesToFlush.set(i, null);
+            }
+        }
     }
 
-    public String getName() {
-        return batchTransferQueue.getName();
+
+    public JCQueue getLocalQueue(AddressedTuple tuple) {
+        if ( (tuple.dest-indexingBase) >= localReceiveQueues.size()) {
+            return null;
+        }
+        return localReceiveQueues.get(tuple.dest - indexingBase);
     }
 
-    @Override
-    public void onEvent(Object event, long sequence, boolean endOfBatch) 
throws Exception {
-        ArrayList cachedEvents = (ArrayList) cachedEmit.getObject();
-        cachedEvents.add(event);
-        if (endOfBatch) {
-            workerData.transfer(serializer, cachedEvents);
-            cachedEmit.setObject(new ArrayList<>());
+    /** Adds tuple to localQueue (if overflow is empty). If localQueue is full 
adds to pendingEmits instead.
+     *  pendingEmits can be null.
+     *  Returns false if unable to add to localQueue.
+     */
+    public boolean tryTransferLocal(AddressedTuple tuple, JCQueue localQueue, 
Queue<AddressedTuple> pendingEmits) {
+        workerData.checkSerialize(serializer, tuple);
+        if (pendingEmits != null) {
+            if (pendingEmits.isEmpty() && localQueue.tryPublish(tuple)) {
+                queuesToFlush.set(tuple.dest - indexingBase, localQueue);
+                return true;
+            } else {
+                pendingEmits.add(tuple);
+                return false;
+            }
+        } else {
+          return localQueue.tryPublish(tuple);
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java 
b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
index 4b7f483..f9e6822 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/IRunningExecutor.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.executor;
 
 import org.apache.storm.generated.Credentials;
@@ -22,15 +23,19 @@ import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.grouping.LoadMapping;
 
 import java.util.List;
-import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.JCQueue;
 
 public interface IRunningExecutor {
 
     ExecutorStats renderStats();
+
     List<Long> getExecutorId();
+
     void credentialsChanged(Credentials credentials);
+
     void loadChanged(LoadMapping loadMapping);
-    boolean getBackPressureFlag();
-    DisruptorQueue getReceiveQueue();
-    DisruptorQueue getSendQueue();
+
+    JCQueue getReceiveQueue();
+
+    boolean publishFlushTuple();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/executor/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/LocalExecutor.java 
b/storm-client/src/jvm/org/apache/storm/executor/LocalExecutor.java
index 4f2811b..19c38b6 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/LocalExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/LocalExecutor.java
@@ -19,11 +19,12 @@
 package org.apache.storm.executor;
 
 import org.apache.storm.daemon.worker.WorkerState;
-import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.utils.RegisteredGlobalState;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class LocalExecutor {
@@ -33,14 +34,13 @@ public class LocalExecutor {
     public static Executor mkExecutor(WorkerState workerState, List<Long> 
executorId, Map<String, String> initialCredentials)
         throws Exception {
         Executor executor = Executor.mkExecutor(workerState, executorId, 
initialCredentials);
-        executor.setLocalExecutorTransfer(new ExecutorTransfer(workerState, 
executor.getTransferWorkerQueue(),
-            executor.getStormConf()) {
+        executor.setLocalExecutorTransfer(new ExecutorTransfer(workerState, 
executor.getTopoConf()) {
             @Override
-            public void transfer(int task, Tuple tuple) {
+            public boolean tryTransfer(AddressedTuple tuple, 
Queue<AddressedTuple> pendingEmits) {
                 if (null != trackId) {
                     ((AtomicInteger) ((Map) 
RegisteredGlobalState.getState(trackId)).get("transferred")).incrementAndGet();
                 }
-                super.transfer(task, tuple);
+                return super.tryTransfer(tuple, pendingEmits);
             }
         });
         return executor;

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java 
b/storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java
index 4b6d0fa..7429c3f 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java
@@ -87,4 +87,12 @@ public class TupleInfo implements Serializable {
     public void setTaskId(int taskId) {
         this.taskId = taskId;
     }
+
+    public void clear() {
+        messageId = null;
+        stream = null;
+        values = null;
+        timestamp = 0;
+        id = null;
+    }
 }

Reply via email to