Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r158208197 --- Diff: storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java --- @@ -18,105 +18,188 @@ package org.apache.storm.executor.bolt; import com.google.common.collect.ImmutableMap; + +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; + +import org.apache.storm.Config; import org.apache.storm.Constants; import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.policy.IWaitStrategy; import org.apache.storm.daemon.Task; import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; import org.apache.storm.daemon.worker.WorkerState; import org.apache.storm.executor.Executor; import org.apache.storm.hooks.info.BoltExecuteInfo; +import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION; +import org.apache.storm.policy.WaitStrategyPark; import org.apache.storm.stats.BoltExecutorStats; import org.apache.storm.task.IBolt; -import org.apache.storm.task.IOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.AddressedTuple; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.JCQueue.ExitCondition; +import org.apache.storm.utils.ReflectionUtils; import org.apache.storm.utils.Utils; -import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.JCQueue; import org.apache.storm.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.Callable; +import java.util.function.BooleanSupplier; public class BoltExecutor extends Executor { private static final Logger LOG = LoggerFactory.getLogger(BoltExecutor.class); - private final Callable<Boolean> executeSampler; + private final BooleanSupplier executeSampler; + private final boolean isSystemBoltExecutor; + private final IWaitStrategy consumeWaitStrategy; // employed when no incoming data + private final IWaitStrategy backPressureWaitStrategy; // employed when outbound path is congested + private BoltOutputCollectorImpl outputCollector; public BoltExecutor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials) { super(workerData, executorId, credentials); this.executeSampler = ConfigUtils.mkStatsSampler(topoConf); + this.isSystemBoltExecutor = (executorId == Constants.SYSTEM_EXECUTOR_ID ); + if (isSystemBoltExecutor) { + this.consumeWaitStrategy = makeSystemBoltWaitStrategy(); + } else { + this.consumeWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BOLT_WAIT_STRATEGY)); + this.consumeWaitStrategy.prepare(topoConf, WAIT_SITUATION.BOLT_WAIT); + } + this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY)); + this.backPressureWaitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT); + } - public void init(Map<Integer, Task> idToTask) { + private static IWaitStrategy makeSystemBoltWaitStrategy() { + WaitStrategyPark ws = new WaitStrategyPark(); + HashMap conf = new HashMap<String,Object>(); + conf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 5000); + ws.prepare(conf, WAIT_SITUATION.BOLT_WAIT); + return ws; + } + + public void init(ArrayList<Task> idToTask, int idToTaskBase) { + executorTransfer.initLocalRecvQueues(); while (!stormActive.get()) { Utils.sleep(100); } - this.errorReportingMetrics.registerAll(topoConf, idToTask.values().iterator().next().getUserContext()); - - LOG.info("Preparing bolt {}:{}", componentId, idToTask.keySet()); - for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) { - Task taskData = entry.getValue(); + if (!componentId.equals(StormCommon.SYSTEM_STREAM_ID)) { // System bolt doesn't call reportError() + this.errorReportingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext()); + } + LOG.info("Preparing bolt {}:{}", componentId, getTaskIds()); + for (Task taskData : idToTask) { + if (taskData == null) { + //This happens if the min id is too small + continue; + } IBolt boltObject = (IBolt) taskData.getTaskObject(); TopologyContext userContext = taskData.getUserContext(); taskData.getBuiltInMetrics().registerAll(topoConf, userContext); if (boltObject instanceof ICredentialsListener) { ((ICredentialsListener) boltObject).setCredentials(credentials); } if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) { - Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue, - "transfer", workerData.getTransferQueue()); + Map<String, JCQueue> map = ImmutableMap.of("receive", receiveQueue, "transfer", workerData.getTransferQueue()); BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, userContext); - Map cachedNodePortToSocket = (Map) workerData.getCachedNodeToPortSocket().get(); + Map cachedNodePortToSocket = workerData.getCachedNodeToPortSocket().get(); BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, topoConf, userContext); BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.getReceiver(), topoConf, userContext); } else { - Map<String, DisruptorQueue> map = ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue); + Map<String, JCQueue> map = ImmutableMap.of("receive", receiveQueue); BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, userContext); } - IOutputCollector outputCollector = new BoltOutputCollectorImpl(this, taskData, entry.getKey(), rand, hasEventLoggers, isDebug); + this.outputCollector = new BoltOutputCollectorImpl(this, taskData, rand, hasEventLoggers, ackingEnabled, isDebug); boltObject.prepare(topoConf, userContext, new OutputCollector(outputCollector)); } openOrPrepareWasCalled.set(true); - LOG.info("Prepared bolt {}:{}", componentId, idToTask.keySet()); + LOG.info("Prepared bolt {}:{}", componentId, taskIds); setupMetrics(); } @Override - public Callable<Object> call() throws Exception { - init(idToTask); + public Callable<Long> call() throws Exception { + init(idToTask, idToTaskBase); - return new Callable<Object>() { + return new Callable<Long>() { + private ExitCondition tillNoPendingEmits = () -> pendingEmits.isEmpty(); + int bpIdleCount = 0; + int consumeIdleCounter = 0; @Override - public Object call() throws Exception { - receiveQueue.consumeBatchWhenAvailable(BoltExecutor.this); + public Long call() throws Exception { + boolean pendingEmitsIsEmpty = tryFlushPendingEmits(); + if (pendingEmitsIsEmpty) { + if (bpIdleCount!=0) { + LOG.debug("Ending Back Pressure Wait stretch : {}", bpIdleCount); + } + bpIdleCount = 0; + int consumeCount = receiveQueue.consume(BoltExecutor.this, tillNoPendingEmits); + if (consumeCount == 0) { + if (consumeIdleCounter==0) { + LOG.debug("Invoking consume wait strategy"); + } + consumeIdleCounter = consumeWaitStrategy.idle(consumeIdleCounter); --- End diff -- Could we let wait strategy instances handle relevant counters (consumeIdleCounter, bpIdleCount)?
---