Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r161358493
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java 
---
    @@ -155,134 +150,159 @@ public void start() throws Exception {
     
             Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
                 @Override public Object run() throws Exception {
    -                workerState =
    -                    new WorkerState(conf, context, topologyId, 
assignmentId, port, workerId, topologyConf, stateStorage,
    +                return loadWorker(topologyConf, stateStorage, 
stormClusterState, initCreds, initialCredentials);
    +            }
    +        }); // Subject.doAs(...)
    +
    +    }
    +
    +    private Object loadWorker(Map<String, Object> topologyConf, 
IStateStorage stateStorage, IStormClusterState stormClusterState, Map<String, 
String> initCreds, Credentials initialCredentials)
    +            throws Exception {
    +        workerState =
    +                new WorkerState(conf, context, topologyId, assignmentId, 
port, workerId, topologyConf, stateStorage,
                             stormClusterState);
     
    -                // Heartbeat here so that worker process dies if this fails
    -                // it's important that worker heartbeat to supervisor ASAP 
so that supervisor knows
    -                // that worker is running and moves on
    -                doHeartBeat();
    +        // Heartbeat here so that worker process dies if this fails
    +        // it's important that worker heartbeat to supervisor ASAP so that 
supervisor knows
    +        // that worker is running and moves on
    +        doHeartBeat();
     
    -                executorsAtom = new AtomicReference<>(null);
    +        executorsAtom = new AtomicReference<>(null);
     
    -                // launch heartbeat threads immediately so that 
slow-loading tasks don't cause the worker to timeout
    -                // to the supervisor
    -                workerState.heartbeatTimer
    -                    .scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    -                        try {
    -                            doHeartBeat();
    -                        } catch (IOException e) {
    -                            throw new RuntimeException(e);
    -                        }
    -                    });
    +        // launch heartbeat threads immediately so that slow-loading tasks 
don't cause the worker to timeout
    +        // to the supervisor
    +        workerState.heartbeatTimer
    +                .scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    +                    try {
    +                        doHeartBeat();
    +                    } catch (IOException e) {
    +                        throw new RuntimeException(e);
    +                    }
    +                });
     
    -                workerState.executorHeartbeatTimer
    -                    .scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
    +        workerState.executorHeartbeatTimer
    +                .scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
                             Worker.this::doExecutorHeartbeats);
     
    -                workerState.registerCallbacks();
    +        workerState.registerCallbacks();
     
    -                workerState.refreshConnections(null);
    +        workerState.refreshConnections(null);
     
    -                workerState.activateWorkerWhenAllConnectionsReady();
    +        workerState.activateWorkerWhenAllConnectionsReady();
     
    -                workerState.refreshStormActive(null);
    +        workerState.refreshStormActive(null);
     
    -                workerState.runWorkerStartHooks();
    +        workerState.runWorkerStartHooks();
     
    -                List<IRunningExecutor> newExecutors = new 
ArrayList<IRunningExecutor>();
    -                for (List<Long> e : workerState.getExecutors()) {
    -                    if (ConfigUtils.isLocalMode(topologyConf)) {
    -                        newExecutors.add(
    -                            LocalExecutor.mkExecutor(workerState, e, 
initCreds)
    -                                .execute());
    -                    } else {
    -                        newExecutors.add(
    -                            Executor.mkExecutor(workerState, e, initCreds)
    -                                .execute());
    -                    }
    -                }
    -                executorsAtom.set(newExecutors);
    +        List<Executor> execs = new ArrayList<>();
    +        for (List<Long> e : workerState.getExecutors()) {
    +            if (ConfigUtils.isLocalMode(topologyConf)) {
    +                Executor executor = LocalExecutor.mkExecutor(workerState, 
e, initCreds);
    +                execs.add( executor );
    +                
workerState.localReceiveQueues.put(executor.getTaskIds().get(0), 
executor.getReceiveQueue());
    +            } else {
    +                Executor executor = Executor.mkExecutor(workerState, e, 
initCreds);
    +                
workerState.localReceiveQueues.put(executor.getTaskIds().get(0), 
executor.getReceiveQueue());
    +                execs.add(executor);
    +            }
    +        }
     
    -                EventHandler<Object> tupleHandler = (packets, seqId, 
batchEnd) -> workerState
    -                    .sendTuplesToRemoteWorker((HashMap<Integer, 
ArrayList<TaskMessage>>) packets, seqId, batchEnd);
    +        List<IRunningExecutor> newExecutors = new 
ArrayList<IRunningExecutor>();
    +        for (Executor executor : execs) {
    +            newExecutors.add(executor.execute());
    +        }
    +        executorsAtom.set(newExecutors);
     
    -                // This thread will publish the messages destined for 
remote tasks to remote connections
    -                transferThread = Utils.asyncLoop(() -> {
    -                    
workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
    -                    return 0L;
    -                });
    +        // This thread will send out messages destined for remote tasks 
(on other workers)
    +        transferThread = workerState.makeTransferThread();
    +        transferThread.setName("Worker-Transfer");
     
    -                DisruptorBackpressureCallback disruptorBackpressureHandler 
=
    -                    mkDisruptorBackpressureHandler(workerState);
    -                
workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
    -                workerState.transferQueue
    -                    .setEnableBackpressure((Boolean) 
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
    -                workerState.transferQueue
    -                    
.setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
    -                workerState.transferQueue
    -                    
.setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
    -
    -                WorkerBackpressureCallback backpressureCallback = 
mkBackpressureHandler();
    -                backpressureThread = new 
WorkerBackpressureThread(workerState.backpressureTrigger, workerState, 
backpressureCallback);
    -                if ((Boolean) 
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    -                    backpressureThread.start();
    -                    stormClusterState.topologyBackpressure(topologyId, 
workerState::refreshThrottle);
    -                    
    -                    int pollingSecs = 
ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
    -                    
workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, 
workerState::refreshThrottle);
    -                }
    +        credentialsAtom = new 
AtomicReference<Credentials>(initialCredentials);
     
    -                credentialsAtom = new 
AtomicReference<Credentials>(initialCredentials);
    +        establishLogSettingCallback();
     
    -                establishLogSettingCallback();
    +        workerState.stormClusterState.credentials(topologyId, 
Worker.this::checkCredentialsChanged);
     
    -                workerState.stormClusterState.credentials(topologyId, 
Worker.this::checkCredentialsChanged);
    +        workerState.refreshCredentialsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new 
Runnable() {
    +                    @Override public void run() {
    +                        checkCredentialsChanged();
    +                    }
    +                });
     
    -                workerState.refreshCredentialsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), 
new Runnable() {
    -                        @Override public void run() {
    -                            checkCredentialsChanged();
    -                            if ((Boolean) 
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    -                               checkThrottleChanged();
    -                            }
    +        workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
    +                (Integer) 
conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new 
Runnable() {
    +                    @Override public void run() {
    +                        try {
    +                            LOG.debug("Checking if blobs have updated");
    +                            updateBlobUpdates();
    +                        } catch (IOException e) {
    +                            // IOException from reading the version files 
to be ignored
    +                            LOG.error(e.getStackTrace().toString());
                             }
    -                    });
    -
    -                workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
    -                        (Integer) 
conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new 
Runnable() {
    -                            @Override public void run() {
    -                                try {
    -                                    LOG.debug("Checking if blobs have 
updated");
    -                                    updateBlobUpdates();
    -                                } catch (IOException e) {
    -                                    // IOException from reading the 
version files to be ignored
    -                                    
LOG.error(e.getStackTrace().toString());
    -                                }
    -                            }
    -                        });
    -
    -                // The jitter allows the clients to get the data at 
different times, and avoids thundering herd
    -                if (!(Boolean) 
topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    -                    
workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, 
workerState::refreshLoad);
    -                }
    +                    }
    +                });
    +
    +        // The jitter allows the clients to get the data at different 
times, and avoids thundering herd
    +        if (!(Boolean) 
topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    +            workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 
500, workerState::refreshLoad);
    +        }
    +
    +        workerState.refreshConnectionsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), 
workerState::refreshConnections);
     
    -                workerState.refreshConnectionsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), 
workerState::refreshConnections);
    +        workerState.resetLogLevelsTimer.scheduleRecurring(0,
    +                (Integer) 
conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), 
logConfigManager::resetLogLevels);
     
    -                workerState.resetLogLevelsTimer.scheduleRecurring(0,
    -                    (Integer) 
conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), 
logConfigManager::resetLogLevels);
    +        workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) 
conf.get(Config.TASK_REFRESH_POLL_SECS),
    +                workerState::refreshStormActive);
     
    -                workerState.refreshActiveTimer.scheduleRecurring(0, 
(Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
    -                    workerState::refreshStormActive);
    +        setupFlushTupleTimer(topologyConf, newExecutors);
    +        setupBackPressureCheckTimer(topologyConf);
    +
    +        LOG.info("Worker has topology config {}", 
Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    +        LOG.info("Worker {} for storm {} on {}:{}  has finished loading", 
workerId, topologyId, assignmentId, port);
    +        return this;
    +    }
     
    -                LOG.info("Worker has topology config {}", 
Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    -                LOG.info("Worker {} for storm {} on {}:{}  has finished 
loading", workerId, topologyId, assignmentId, port);
    -                return this;
    -            };
    +    private void setupFlushTupleTimer(final Map<String, Object> 
topologyConf, final List<IRunningExecutor> executors) {
    +        final Integer producerBatchSize = 
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
    +        final Integer xferBatchSize = 
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BATCH_SIZE));
    +        final Long flushIntervalMillis = 
ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BATCH_FLUSH_INTERVAL_MILLIS));
    +        if ((producerBatchSize == 1 && xferBatchSize == 1) || 
flushIntervalMillis == 0) {
    +            LOG.info("Flush Tuple generation disabled. 
producerBatchSize={}, xferBatchSize={}, flushIntervalMillis={}", 
producerBatchSize, xferBatchSize, flushIntervalMillis);
    +            return;
    +        }
    +
    +        
workerState.flushTupleTimer.scheduleRecurringMs(flushIntervalMillis, 
flushIntervalMillis, new Runnable() {
    +            @Override
    +            public void run() {
    +                // send flush tuple to all executors
    +                for (int i = 0; i < executors.size(); i++) {
    +                    IRunningExecutor exec = executors.get(i);
    +                    if (exec.getExecutorId().get(0) != 
Constants.SYSTEM_TASK_ID) {
    +                        exec.getExecutor().publishFlushTuple();
    +                    }
    +                }
    +            }
             });
    +        LOG.info("Flush tuple will be generated every {} millis", 
flushIntervalMillis);
    +    }
     
    +    private void setupBackPressureCheckTimer(final Map<String, Object> 
topologyConf) {
    +        final Integer workerCount = 
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_WORKERS));
    +        if (workerCount <= 1) {
    +            LOG.info("BackPressure change checking is disabled as there is 
only one worker");
    +            return;
    +        }
    +        final Long bpCheckIntervalMs = 
ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_CHECK_MILLIS));
    +        
workerState.backPressureCheckTimer.scheduleRecurringMs(bpCheckIntervalMs, 
bpCheckIntervalMs, new Runnable() {
    --- End diff --
    
    fixed


---

Reply via email to