Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167326878
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
    @@ -313,107 +330,74 @@ public void metricsTick(Task taskData, TupleImpl 
tuple) {
         protected void setupMetrics() {
             for (final Integer interval : 
intervalToTaskToMetricToRegistry.keySet()) {
                 StormTimer timerTask = workerData.getUserTimer();
    -            timerTask.scheduleRecurring(interval, interval, new Runnable() 
{
    -                @Override
    -                public void run() {
    -                    TupleImpl tuple = new TupleImpl(workerTopologyContext, 
new Values(interval),
    -                            (int) Constants.SYSTEM_TASK_ID, 
Constants.METRICS_TICK_STREAM_ID);
    -                    List<AddressedTuple> metricsTickTuple =
    -                            Lists.newArrayList(new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
    -                    receiveQueue.publish(metricsTickTuple);
    +            timerTask.scheduleRecurring(interval, interval,
    +                () -> {
    +                    TupleImpl tuple = new TupleImpl(workerTopologyContext, 
new Values(interval), Constants.SYSTEM_COMPONENT_ID,
    +                        (int) Constants.SYSTEM_TASK_ID, 
Constants.METRICS_TICK_STREAM_ID);
    +                    AddressedTuple metricsTickTuple = new 
AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
    +                    try {
    +                        receiveQueue.publish(metricsTickTuple);
    +                        receiveQueue.flush();  // avoid buffering
    +                    } catch (InterruptedException e) {
    +                        LOG.warn("Thread interrupted when publishing 
metrics. Setting interrupt flag.");
    +                        Thread.currentThread().interrupt();
    +                        return;
    +                    }
                     }
    -            });
    -        }
    -    }
    -
    -    public void sendUnanchored(Task task, String stream, List<Object> 
values, ExecutorTransfer transfer) {
    -        Tuple tuple = task.getTuple(stream, values);
    -        List<Integer> tasks = task.getOutgoingTasks(stream, values);
    -        for (Integer t : tasks) {
    -            transfer.transfer(t, tuple);
    -        }
    -    }
    -
    -    /**
    -     * Send sampled data to the eventlogger if the global or component 
level debug flag is set (via nimbus api).
    -     */
    -    public void sendToEventLogger(Executor executor, Task taskData, List 
values,
    -                                  String componentId, Object messageId, 
Random random) {
    -        Map<String, DebugOptions> componentDebug = 
executor.getStormComponentDebug().get();
    -        DebugOptions debugOptions = componentDebug.get(componentId);
    -        if (debugOptions == null) {
    -            debugOptions = componentDebug.get(executor.getStormId());
    -        }
    -        double spct = ((debugOptions != null) && 
(debugOptions.is_enable())) ? debugOptions.get_samplingpct() : 0;
    -        if (spct > 0 && (random.nextDouble() * 100) < spct) {
    -            sendUnanchored(taskData, StormCommon.EVENTLOGGER_STREAM_ID,
    -                    new Values(componentId, messageId, 
System.currentTimeMillis(), values),
    -                    executor.getExecutorTransfer());
    +            );
             }
         }
     
    -    public void reflectNewLoadMapping(LoadMapping loadMapping) {
    -        for (LoadAwareCustomStreamGrouping g : groupers) {
    -            g.refreshLoad(loadMapping);
    -        }
    -    }
    -
    -    private void registerBackpressure() {
    -        receiveQueue.registerBackpressureCallback(new 
DisruptorBackpressureCallback() {
    -            @Override
    -            public void highWaterMark() throws Exception {
    -                LOG.debug("executor " + executorId + " is congested, set 
backpressure flag true");
    -                
WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
    -            }
    -
    -            @Override
    -            public void lowWaterMark() throws Exception {
    -                LOG.debug("executor " + executorId + " is not-congested, 
set backpressure flag false");
    -                
WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
    -            }
    -        });
    -        
receiveQueue.setHighWaterMark(ObjectReader.getDouble(topoConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
    -        
receiveQueue.setLowWaterMark(ObjectReader.getDouble(topoConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
    -        
receiveQueue.setEnableBackpressure(ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE),
 false));
    -    }
    -
         protected void setupTicks(boolean isSpout) {
             final Integer tickTimeSecs = 
ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);
    -        boolean enableMessageTimeout = (Boolean) 
topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
             if (tickTimeSecs != null) {
    +            boolean enableMessageTimeout = (Boolean) 
topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
                 if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) && 
Utils.isSystemId(componentId))
                     || (!enableMessageTimeout && isSpout)) {
    -                LOG.info("Timeouts disabled for executor {}:{}", 
componentId, executorId);
    +                LOG.info("Timeouts disabled for executor " + componentId + 
":" + executorId);
    --- End diff --
    
    nit: why did we go back to String concatenation?


---

Reply via email to