Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159962361
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java ---
    @@ -17,72 +17,124 @@
      */
     package org.apache.storm.executor;
     
    -import com.google.common.annotations.VisibleForTesting;
    -import com.lmax.disruptor.EventHandler;
     import org.apache.storm.Config;
     import org.apache.storm.daemon.worker.WorkerState;
     import org.apache.storm.serialization.KryoTupleSerializer;
     import org.apache.storm.tuple.AddressedTuple;
    -import org.apache.storm.tuple.Tuple;
    -import org.apache.storm.utils.DisruptorQueue;
    -import org.apache.storm.utils.MutableObject;
    +import org.apache.storm.utils.JCQueue;
     import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Utils;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.util.ArrayList;
    +import java.util.Collections;
     import java.util.Map;
    -import java.util.concurrent.Callable;
    +import java.util.Queue;
     
    -public class ExecutorTransfer implements EventHandler, Callable {
    +// Every executor has an instance of this class
    +public class ExecutorTransfer  {
         private static final Logger LOG = 
LoggerFactory.getLogger(ExecutorTransfer.class);
     
         private final WorkerState workerData;
    -    private final DisruptorQueue batchTransferQueue;
    -    private final Map<String, Object> topoConf;
         private final KryoTupleSerializer serializer;
    -    private final MutableObject cachedEmit;
         private final boolean isDebug;
    +    private final int producerBatchSz;
    +    private int remotesBatchSz = 0;
    +    private int indexingBase = 0;
    +    private ArrayList<JCQueue> localReceiveQueues; // 
[taskId-indexingBase] => queue : List of all recvQs local to this worker
    +    private ArrayList<JCQueue> queuesToFlush; // [taskId-indexingBase] => 
queue, some entries can be null. : outbound Qs for this executor instance
     
    -    public ExecutorTransfer(WorkerState workerData, DisruptorQueue 
batchTransferQueue, Map<String, Object> topoConf) {
    +
    +    public ExecutorTransfer(WorkerState workerData, Map<String, Object> 
topoConf) {
             this.workerData = workerData;
    -        this.batchTransferQueue = batchTransferQueue;
    -        this.topoConf = topoConf;
             this.serializer = new KryoTupleSerializer(topoConf, 
workerData.getWorkerTopologyContext());
    -        this.cachedEmit = new MutableObject(new ArrayList<>());
             this.isDebug = 
ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
    +        this.producerBatchSz = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
    +    }
    +
    +    // to be called after all Executor objects in the worker are created 
and before this object is used
    +    public void initLocalRecvQueues() {
    +        Integer minTaskId = 
workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get();
    +        this.localReceiveQueues = Utils.convertToArray( 
workerData.getLocalReceiveQueues(), minTaskId);
    +        this.indexingBase = minTaskId;
    +        this.queuesToFlush = new 
ArrayList<JCQueue>(Collections.nCopies(localReceiveQueues.size(), null) );
         }
     
    -    public void transfer(int task, Tuple tuple) {
    -        AddressedTuple val = new AddressedTuple(task, tuple);
    +    // adds addressedTuple to destination Q if it is not full. else adds 
to pendingEmits (if its not null)
    +    public boolean tryTransfer(AddressedTuple addressedTuple, 
Queue<AddressedTuple> pendingEmits) {
             if (isDebug) {
    -            LOG.info("TRANSFERRING tuple {}", val);
    +            LOG.info("TRANSFERRING tuple {}", addressedTuple);
    +        }
    +
    +        JCQueue localQueue = getLocalQueue(addressedTuple);
    +        if (localQueue!=null) {
    +            return tryTransferLocal(addressedTuple, localQueue, 
pendingEmits);
    +        }  else  {
    +            if (remotesBatchSz >= producerBatchSz) {
    +                if ( !workerData.tryFlushRemotes() ) {
    +                    if (pendingEmits != null) {
    +                        pendingEmits.add(addressedTuple);
    +                    }
    +                    return false;
    +                }
    +                remotesBatchSz = 0;
    --- End diff --
    
    Do we have a race condition here?  I believe that this method can be called 
from multiple different threads, and if so then we now have to worry about 
remotesBatchSz staying consistent.


---

Reply via email to