[ 
https://issues.apache.org/jira/browse/STORM-2006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15399206#comment-15399206
 ] 

ASF GitHub Bot commented on STORM-2006:
---------------------------------------

Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1595#discussion_r72780835
  
    --- Diff: storm-core/src/jvm/org/apache/storm/metric/SystemBolt.java ---
    @@ -156,7 +219,103 @@ private void registerMetrics(TopologyContext context, 
Map<String, String> metric
     
         @Override
         public void execute(Tuple input) {
    -        throw new RuntimeException("Non-system tuples should never be sent 
to __system bolt.");
    +        IMetricsConsumer.TaskInfo taskInfo = (IMetricsConsumer.TaskInfo) 
input.getValue(0);
    +        Collection<IMetricsConsumer.DataPoint> dataPoints = (Collection) 
input.getValue(1);
    +        Collection<IMetricsConsumer.DataPoint> expandedDataPoints = 
expander.expandDataPoints(dataPoints);
    +
    +        if (aggregateMode) {
    +            handleMetricTupleInAggregateMode(taskInfo, expandedDataPoints);
    +        } else {
    +            collector.emit(Constants.METRICS_AGGREGATE_STREAM_ID, new 
Values(taskInfo, expandedDataPoints));
    +        }
    +    }
    +
    +    private void 
handleMetricTupleInAggregateMode(IMetricsConsumer.TaskInfo taskInfo, 
Collection<IMetricsConsumer.DataPoint> expandedDataPoints) {
    +        Map<Integer, TaskInfoToDataPointsPair> taskToMetricTupleMap = 
intervalToTaskToMetricTupleMap.get(taskInfo.updateIntervalSecs);
    +        if (taskToMetricTupleMap == null) {
    +            taskToMetricTupleMap = new HashMap<>();
    +            
intervalToTaskToMetricTupleMap.put(taskInfo.updateIntervalSecs, 
taskToMetricTupleMap);
    +        }
    +
    +        taskToMetricTupleMap.put(taskInfo.srcTaskId, new 
TaskInfoToDataPointsPair(taskInfo, expandedDataPoints));
    --- End diff --
    
    I didn't get this. It overrides the older information with newer one. what 
if the data point for same task is received before 
'isOKtoAggregateMetricsTuples` is true. 


> Storm metrics feature improvement: support per-worker level metrics 
> aggregation
> -------------------------------------------------------------------------------
>
>                 Key: STORM-2006
>                 URL: https://issues.apache.org/jira/browse/STORM-2006
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-core
>    Affects Versions: 1.1.0
>            Reporter: Jungtaek Lim
>            Assignee: Jungtaek Lim
>
> Storm provides per-task level metrics which could be huge when topology has a 
> number of tasks. 
> Task level metric is useful for determining load balance between tasks, but 
> it doesn't need to be time-series fashion.
> Before introducing topology level component like TopologyMaster for JStorm, 
> we can utilize SystemBolt to aggregate task level metrics to per-worker level 
> metrics.
> We should provide options and this feature should be turned off by default to 
> keep backward compatibility. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to