Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/816#discussion_r49327965
  
    --- Diff: 
storm-core/src/jvm/backtype/storm/metric/internal/CountStatAndMetric.java ---
    @@ -0,0 +1,211 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.metric.internal;
    +
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import backtype.storm.metric.api.IMetric;
    +
    +/**
    + * Acts as a Count Metric, but also keeps track of approximate counts
    + * for the last 10 mins, 3 hours, 1 day, and all time.
    + */
    +public class CountStatAndMetric implements IMetric{
    +    private final AtomicLong _currentBucket;
    +    // All internal state except for the count of the current bucket are
    +    // protected using a lock on this counter
    +    private long _bucketStart;
    +
    +    //exact variable time, that is added to the current bucket
    +    private long _exactExtra;
    + 
    +    //10 min values
    +    private final int _tmSize;
    +    private final long[] _tmBuckets;
    +    private final long[] _tmTime;
    +    
    +    //3 hour values
    +    private final int _thSize;
    +    private final long[] _thBuckets;
    +    private final long[] _thTime;
    +
    +    //1 day values
    +    private final int _odSize;
    +    private final long[] _odBuckets;
    +    private final long[] _odTime;
    + 
    +    //all time
    +    private long _allTime;
    +
    +    private final TimerTask _task;
    +
    +    /**
    +     * @param numBuckets the number of buckets to divide the time periods 
into.
    +     */
    +    public CountStatAndMetric(int numBuckets) {
    +        this(numBuckets, -1);
    +    }
    +
    +    /**
    +     * Constructor
    +     * @param numBuckets the number of buckets to divide the time periods 
into.
    +     * @param startTime if positive the simulated time to start the from.
    +     */
    +    CountStatAndMetric(int numBuckets, long startTime){
    +        numBuckets = Math.max(numBuckets, 2);
    +        //We want to capture the full time range, so the target size is as
    +        // if we had one bucket less, then we do
    +        _tmSize = 10 * 60 * 1000 / (numBuckets - 1);
    +        _thSize = 3 * 60 * 60 * 1000 / (numBuckets - 1);
    +        _odSize = 24 * 60 * 60 * 1000 / (numBuckets - 1);
    +        if (_tmSize < 1 || _thSize < 1 || _odSize < 1) {
    +            throw new IllegalArgumentException("number of buckets is too 
large to be supported");
    +        }
    +        _tmBuckets = new long[numBuckets];
    +        _tmTime = new long[numBuckets];
    +        _thBuckets = new long[numBuckets];
    +        _thTime = new long[numBuckets];
    +        _odBuckets = new long[numBuckets];
    +        _odTime = new long[numBuckets];
    +        _allTime = 0;
    +        _exactExtra = 0;
    +
    +        _bucketStart = startTime >= 0 ? startTime : 
System.currentTimeMillis();
    +        _currentBucket = new AtomicLong(0);
    +        if (startTime < 0) {
    +            _task = new Fresher();
    +            MetricStatTimer._timer.scheduleAtFixedRate(_task, _tmSize, 
_tmSize);
    +        } else {
    +            _task = null;
    +        }
    +    }
    +
    +    /**
    +     * Increase the count by the given value.
    +     *
    +     * @param count number to count
    +     */
    +    public void incBy(long count) {
    +        _currentBucket.addAndGet(count);
    +    }
    +
    +   
    +
    +    @Override
    +    public synchronized Object getValueAndReset() {
    +        return getValueAndReset(System.currentTimeMillis());
    +    }
    +
    +    synchronized Object getValueAndReset(long now) {
    +        long value = _currentBucket.getAndSet(0);
    +        long timeSpent = now - _bucketStart;
    +        long ret = value + _exactExtra;
    +        _bucketStart = now;
    +        _exactExtra = 0;
    +        rotateBuckets(value, timeSpent);
    +        return ret;
    +    }
    +
    +    synchronized void rotateSched(long now) {
    +        long value = _currentBucket.getAndSet(0);
    +        long timeSpent = now - _bucketStart;
    +        _exactExtra += value;
    +        _bucketStart = now;
    +        rotateBuckets(value, timeSpent);
    +    }
    +
    +    synchronized void rotateBuckets(long value, long timeSpent) {
    +        rotate(value, timeSpent, _tmSize, _tmTime, _tmBuckets);
    +        rotate(value, timeSpent, _thSize, _thTime, _thBuckets);
    +        rotate(value, timeSpent, _odSize, _odTime, _odBuckets);
    +        _allTime += value;
    +    }
    +
    +    private synchronized void rotate(long value, long timeSpent, long 
targetSize, long [] times, long [] buckets) {
    +        times[0] += timeSpent;
    +        buckets[0] += value;
    +
    +        long currentTime = 0;
    +        long currentVal = 0;
    +        if (times[0] >= targetSize) {
    +            for (int i = 0; i < buckets.length; i++) {
    --- End diff --
    
    Yes we could, but I don't think it is going to make that much of a 
difference.  I purposely moved rotating the metrics off of the critical path.  
The call to rotate happens in a background thread, that is shared by all 
similar metrics in a worker process.  This allows the critical path, the thing 
that happens all of the time, to be just an atomic increment.  Rotating only 
happens every 30 seconds or so too.  So I suspect the best that it is going to 
do is reduce the overall CPU the worker uses by a tiny amount.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to