Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2241#discussion_r158208197
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java ---
    @@ -18,105 +18,188 @@
     package org.apache.storm.executor.bolt;
     
     import com.google.common.collect.ImmutableMap;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
     import java.util.List;
    +
    +import org.apache.storm.Config;
     import org.apache.storm.Constants;
     import org.apache.storm.ICredentialsListener;
    +import org.apache.storm.daemon.StormCommon;
    +import org.apache.storm.policy.IWaitStrategy;
     import org.apache.storm.daemon.Task;
     import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
     import org.apache.storm.daemon.worker.WorkerState;
     import org.apache.storm.executor.Executor;
     import org.apache.storm.hooks.info.BoltExecuteInfo;
    +import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION;
    +import org.apache.storm.policy.WaitStrategyPark;
     import org.apache.storm.stats.BoltExecutorStats;
     import org.apache.storm.task.IBolt;
    -import org.apache.storm.task.IOutputCollector;
     import org.apache.storm.task.OutputCollector;
     import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.tuple.AddressedTuple;
     import org.apache.storm.tuple.TupleImpl;
     import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.JCQueue.ExitCondition;
    +import org.apache.storm.utils.ReflectionUtils;
     import org.apache.storm.utils.Utils;
    -import org.apache.storm.utils.DisruptorQueue;
    +import org.apache.storm.utils.JCQueue;
     import org.apache.storm.utils.Time;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.util.Map;
     import java.util.concurrent.Callable;
    +import java.util.function.BooleanSupplier;
     
     public class BoltExecutor extends Executor {
     
         private static final Logger LOG = 
LoggerFactory.getLogger(BoltExecutor.class);
     
    -    private final Callable<Boolean> executeSampler;
    +    private final BooleanSupplier executeSampler;
    +    private final boolean isSystemBoltExecutor;
    +    private final IWaitStrategy consumeWaitStrategy;       // employed 
when no incoming data
    +    private final IWaitStrategy backPressureWaitStrategy;  // employed 
when outbound path is congested
    +    private BoltOutputCollectorImpl outputCollector;
     
         public BoltExecutor(WorkerState workerData, List<Long> executorId, 
Map<String, String> credentials) {
             super(workerData, executorId, credentials);
             this.executeSampler = ConfigUtils.mkStatsSampler(topoConf);
    +        this.isSystemBoltExecutor =  (executorId == 
Constants.SYSTEM_EXECUTOR_ID );
    +        if (isSystemBoltExecutor) {
    +            this.consumeWaitStrategy = makeSystemBoltWaitStrategy();
    +        } else {
    +            this.consumeWaitStrategy = 
ReflectionUtils.newInstance((String) 
topoConf.get(Config.TOPOLOGY_BOLT_WAIT_STRATEGY));
    +            this.consumeWaitStrategy.prepare(topoConf, 
WAIT_SITUATION.BOLT_WAIT);
    +        }
    +        this.backPressureWaitStrategy = 
ReflectionUtils.newInstance((String) 
topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
    +        this.backPressureWaitStrategy.prepare(topoConf, 
WAIT_SITUATION.BACK_PRESSURE_WAIT);
    +
         }
     
    -    public void init(Map<Integer, Task> idToTask) {
    +    private static IWaitStrategy makeSystemBoltWaitStrategy() {
    +        WaitStrategyPark ws = new WaitStrategyPark();
    +        HashMap conf = new HashMap<String,Object>();
    +        conf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 5000);
    +        ws.prepare(conf, WAIT_SITUATION.BOLT_WAIT);
    +        return ws;
    +    }
    +
    +    public void init(ArrayList<Task> idToTask, int idToTaskBase) {
    +        executorTransfer.initLocalRecvQueues();
             while (!stormActive.get()) {
                 Utils.sleep(100);
             }
     
    -        this.errorReportingMetrics.registerAll(topoConf, 
idToTask.values().iterator().next().getUserContext());
    -
    -        LOG.info("Preparing bolt {}:{}", componentId, idToTask.keySet());
    -        for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
    -            Task taskData = entry.getValue();
    +        if (!componentId.equals(StormCommon.SYSTEM_STREAM_ID)) { // System 
bolt doesn't call reportError()
    +            this.errorReportingMetrics.registerAll(topoConf, 
idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
    +        }
    +        LOG.info("Preparing bolt {}:{}", componentId, getTaskIds());
    +        for (Task taskData : idToTask) {
    +            if (taskData == null) {
    +                //This happens if the min id is too small
    +                continue;
    +            }
                 IBolt boltObject = (IBolt) taskData.getTaskObject();
                 TopologyContext userContext = taskData.getUserContext();
                 taskData.getBuiltInMetrics().registerAll(topoConf, 
userContext);
                 if (boltObject instanceof ICredentialsListener) {
                     ((ICredentialsListener) 
boltObject).setCredentials(credentials);
                 }
                 if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) {
    -                Map<String, DisruptorQueue> map = 
ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue,
    -                        "transfer", workerData.getTransferQueue());
    +                Map<String, JCQueue> map = ImmutableMap.of("receive", 
receiveQueue, "transfer", workerData.getTransferQueue());
                     BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, 
userContext);
     
    -                Map cachedNodePortToSocket = (Map) 
workerData.getCachedNodeToPortSocket().get();
    +                Map cachedNodePortToSocket = 
workerData.getCachedNodeToPortSocket().get();
                     
BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, 
topoConf, userContext);
                     
BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.getReceiver(), 
topoConf, userContext);
                 } else {
    -                Map<String, DisruptorQueue> map = 
ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue);
    +                Map<String, JCQueue> map = ImmutableMap.of("receive", 
receiveQueue);
                     BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, 
userContext);
                 }
     
    -            IOutputCollector outputCollector = new 
BoltOutputCollectorImpl(this, taskData, entry.getKey(), rand, hasEventLoggers, 
isDebug);
    +            this.outputCollector = new BoltOutputCollectorImpl(this, 
taskData, rand, hasEventLoggers, ackingEnabled, isDebug);
                 boltObject.prepare(topoConf, userContext, new 
OutputCollector(outputCollector));
             }
             openOrPrepareWasCalled.set(true);
    -        LOG.info("Prepared bolt {}:{}", componentId, idToTask.keySet());
    +        LOG.info("Prepared bolt {}:{}", componentId, taskIds);
             setupMetrics();
         }
     
         @Override
    -    public Callable<Object> call() throws Exception {
    -        init(idToTask);
    +    public Callable<Long> call() throws Exception {
    +        init(idToTask, idToTaskBase);
     
    -        return new Callable<Object>() {
    +        return new Callable<Long>() {
    +            private ExitCondition tillNoPendingEmits = () -> 
pendingEmits.isEmpty();
    +            int bpIdleCount = 0;
    +            int consumeIdleCounter = 0;
                 @Override
    -            public Object call() throws Exception {
    -                receiveQueue.consumeBatchWhenAvailable(BoltExecutor.this);
    +            public Long call() throws Exception {
    +                boolean pendingEmitsIsEmpty = tryFlushPendingEmits();
    +                if (pendingEmitsIsEmpty) {
    +                    if (bpIdleCount!=0) {
    +                        LOG.debug("Ending Back Pressure Wait stretch : 
{}", bpIdleCount);
    +                    }
    +                    bpIdleCount = 0;
    +                    int consumeCount = 
receiveQueue.consume(BoltExecutor.this, tillNoPendingEmits);
    +                    if (consumeCount == 0) {
    +                        if (consumeIdleCounter==0) {
    +                            LOG.debug("Invoking consume wait strategy");
    +                        }
    +                        consumeIdleCounter = 
consumeWaitStrategy.idle(consumeIdleCounter);
    --- End diff --
    
    Could we let wait strategy instances handle relevant counters 
(consumeIdleCounter, bpIdleCount)?


---

Reply via email to