Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159962361 --- Diff: storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java --- @@ -17,72 +17,124 @@ */ package org.apache.storm.executor; -import com.google.common.annotations.VisibleForTesting; -import com.lmax.disruptor.EventHandler; import org.apache.storm.Config; import org.apache.storm.daemon.worker.WorkerState; import org.apache.storm.serialization.KryoTupleSerializer; import org.apache.storm.tuple.AddressedTuple; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.DisruptorQueue; -import org.apache.storm.utils.MutableObject; +import org.apache.storm.utils.JCQueue; import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.Map; -import java.util.concurrent.Callable; +import java.util.Queue; -public class ExecutorTransfer implements EventHandler, Callable { +// Every executor has an instance of this class +public class ExecutorTransfer { private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class); private final WorkerState workerData; - private final DisruptorQueue batchTransferQueue; - private final Map<String, Object> topoConf; private final KryoTupleSerializer serializer; - private final MutableObject cachedEmit; private final boolean isDebug; + private final int producerBatchSz; + private int remotesBatchSz = 0; + private int indexingBase = 0; + private ArrayList<JCQueue> localReceiveQueues; // [taskId-indexingBase] => queue : List of all recvQs local to this worker + private ArrayList<JCQueue> queuesToFlush; // [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance - public ExecutorTransfer(WorkerState workerData, DisruptorQueue batchTransferQueue, Map<String, Object> topoConf) { + + public ExecutorTransfer(WorkerState workerData, Map<String, Object> topoConf) { this.workerData = workerData; - this.batchTransferQueue = batchTransferQueue; - this.topoConf = topoConf; this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext()); - this.cachedEmit = new MutableObject(new ArrayList<>()); this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false); + this.producerBatchSz = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE)); + } + + // to be called after all Executor objects in the worker are created and before this object is used + public void initLocalRecvQueues() { + Integer minTaskId = workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get(); + this.localReceiveQueues = Utils.convertToArray( workerData.getLocalReceiveQueues(), minTaskId); + this.indexingBase = minTaskId; + this.queuesToFlush = new ArrayList<JCQueue>(Collections.nCopies(localReceiveQueues.size(), null) ); } - public void transfer(int task, Tuple tuple) { - AddressedTuple val = new AddressedTuple(task, tuple); + // adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null) + public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits) { if (isDebug) { - LOG.info("TRANSFERRING tuple {}", val); + LOG.info("TRANSFERRING tuple {}", addressedTuple); + } + + JCQueue localQueue = getLocalQueue(addressedTuple); + if (localQueue!=null) { + return tryTransferLocal(addressedTuple, localQueue, pendingEmits); + } else { + if (remotesBatchSz >= producerBatchSz) { + if ( !workerData.tryFlushRemotes() ) { + if (pendingEmits != null) { + pendingEmits.add(addressedTuple); + } + return false; + } + remotesBatchSz = 0; --- End diff -- Do we have a race condition here? I believe that this method can be called from multiple different threads, and if so then we now have to worry about remotesBatchSz staying consistent.
---