Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2241#discussion_r158924627
--- Diff:
storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java ---
@@ -17,72 +17,123 @@
*/
package org.apache.storm.executor;
-import com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.EventHandler;
import org.apache.storm.Config;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.serialization.KryoTupleSerializer;
import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.DisruptorQueue;
-import org.apache.storm.utils.MutableObject;
+import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.Queue;
-public class ExecutorTransfer implements EventHandler, Callable {
+public class ExecutorTransfer {
private static final Logger LOG =
LoggerFactory.getLogger(ExecutorTransfer.class);
private final WorkerState workerData;
- private final DisruptorQueue batchTransferQueue;
- private final Map<String, Object> topoConf;
private final KryoTupleSerializer serializer;
- private final MutableObject cachedEmit;
private final boolean isDebug;
+ private final int producerBatchSz;
+ private int remotesBatchSz = 0;
+ private int indexingBase = 0;
+ private ArrayList<JCQueue> localReceiveQueues; //
[taksid-indexingBase] => queue : List of all recvQs local to this worker
--- End diff --
fixed.
---