[ https://issues.apache.org/jira/browse/FLUME-1484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13455049#comment-13455049 ]
Ted Malaska edited comment on FLUME-1484 at 9/14/12 4:32 AM: ------------------------------------------------------------- This is not acompletely finished patch but an example of a possible solution. If you like the solution I'll finish the patch. This solution supports two types of averages: 1. Averages since start 2. Rolling Averages (The rolling interval can be set through config properties) I added two gets for these averages in sinkCounter. I also added a junit that tests both of these averages and shows how quickly they deviate. Please let me know what you think. was (Author: ted.m): This is completely finished patch but an example of a possible solution. If you like the solution I'll finish the patch. This solution supports two types of averages: 1. Averages since start 2. Rolling Averages (The rolling interval can be set through config properties) I added two gets for these averages in sinkCounter. I also added a junit that tests both of these averages and shows how quickly they deviate. Please let me know what you think. > Flume support throughput in Agent, Source, Sink level at JMX > ------------------------------------------------------------ > > Key: FLUME-1484 > URL: https://issues.apache.org/jira/browse/FLUME-1484 > Project: Flume > Issue Type: Improvement > Components: Node, Sinks+Sources > Affects Versions: v1.2.0 > Reporter: Denny Ye > Assignee: Ted Malaska > Attachments: FLUME-1411-IDEA.patch > > > From user's view of point, we would like to know the current throughput from > one of monitoring tools. WebUI is best, of course. JMX is simple way to > implement throughput monitoring. > Agent should have input and output throughput based on several Sources and > Sinks. > Here is just simple code in my environment to monitoring throughput of Source. > {code:title=ThroughputCounter.java|borderStyle=solid} > import java.util.concurrent.atomic.AtomicInteger; > import org.apache.flume.instrumentation.SourceCounter; > public class ThroughputCounter { > private volatile boolean isRunning; > private AtomicInteger cache = new AtomicInteger(); > > SourceCounter sourceCounter; > public ThroughputCounter(SourceCounter sourceCounter) { > this.sourceCounter = sourceCounter; > } > > public void start() { > isRunning = true; > > Counter counter = new Counter(); > counter.start(); > } > > public void stop() { > isRunning = false; > } > > > public void addWriteBytes(int bytes) { > cache.getAndAdd(bytes); > } > > private class Counter extends Thread { > > Counter() { > super("ThroughputCounterThread"); > } > > public void run() { > while (isRunning) { > try { > Thread.sleep(1000); > sourceCounter.incrementSourceThroughput( > cache.getAndSet(0)); > } catch (Exception e) { > e.printStackTrace(); > } > } > } > } > > } > {code} -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira