[ 
https://issues.apache.org/jira/browse/FLUME-1484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Malaska updated FLUME-1484:
-------------------------------

    Attachment: FLUME-1411-IDEA.patch

This is completely finished patch but an example of a possible solution. If you 
like the solution I'll finish the patch.

This solution supports two types of averages:
1. Averages since start
2. Rolling Averages (The rolling interval can be set through config properties)

I added two gets for these averages in sinkCounter.

I also added a junit that tests both of these averages and shows how quickly 
they deviate.
Please let me know what you think.
                
> Flume support throughput in Agent, Source, Sink level at JMX
> ------------------------------------------------------------
>
>                 Key: FLUME-1484
>                 URL: https://issues.apache.org/jira/browse/FLUME-1484
>             Project: Flume
>          Issue Type: Improvement
>          Components: Node, Sinks+Sources
>    Affects Versions: v1.2.0
>            Reporter: Denny Ye
>            Assignee: Ted Malaska
>         Attachments: FLUME-1411-IDEA.patch
>
>
> From user's view of point, we would like to know the current throughput from 
> one of monitoring tools. WebUI is best, of course. JMX is simple way to 
> implement throughput monitoring. 
> Agent should have input and output throughput based on several Sources and 
> Sinks.
> Here is just simple code in my environment to monitoring throughput of Source.
> {code:title=ThroughputCounter.java|borderStyle=solid}
> import java.util.concurrent.atomic.AtomicInteger;
> import org.apache.flume.instrumentation.SourceCounter;
> public class ThroughputCounter {
>   private volatile boolean isRunning;   
>   private AtomicInteger cache = new AtomicInteger();
>         
>   SourceCounter sourceCounter;
>   public ThroughputCounter(SourceCounter sourceCounter) {
>     this.sourceCounter = sourceCounter;
>   }
>         
>   public void start() {
>     isRunning = true;
>           
>     Counter counter = new Counter();
>     counter.start();
>   }
>         
>   public void stop() {
>     isRunning = false;
>   }
>         
>         
>   public void addWriteBytes(int bytes) {
>     cache.getAndAdd(bytes);
>   }
>         
>   private class Counter extends Thread {
>           
>     Counter() {
>       super("ThroughputCounterThread");       
>     }
>               
>     public void run() {
>       while (isRunning) {
>       try {
>         Thread.sleep(1000);
>         sourceCounter.incrementSourceThroughput(
> cache.getAndSet(0));
>       } catch (Exception e) {
>         e.printStackTrace();
>       }
>       }
>     }
>   }
>         
> }
> {code} 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to