Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158204261 --- Diff: storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java --- @@ -17,72 +17,123 @@ */ package org.apache.storm.executor; -import com.google.common.annotations.VisibleForTesting; -import com.lmax.disruptor.EventHandler; import org.apache.storm.Config; import org.apache.storm.daemon.worker.WorkerState; import org.apache.storm.serialization.KryoTupleSerializer; import org.apache.storm.tuple.AddressedTuple; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.DisruptorQueue; -import org.apache.storm.utils.MutableObject; +import org.apache.storm.utils.JCQueue; import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.Map; -import java.util.concurrent.Callable; +import java.util.Queue; -public class ExecutorTransfer implements EventHandler, Callable { +public class ExecutorTransfer { private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class); private final WorkerState workerData; - private final DisruptorQueue batchTransferQueue; - private final Map<String, Object> topoConf; private final KryoTupleSerializer serializer; - private final MutableObject cachedEmit; private final boolean isDebug; + private final int producerBatchSz; + private int remotesBatchSz = 0; + private int indexingBase = 0; + private ArrayList<JCQueue> localReceiveQueues; // [taksid-indexingBase] => queue : List of all recvQs local to this worker + private ArrayList<JCQueue> queuesToFlush; // [taksid-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance --- End diff -- nit: Same here.
---