FLUME-1482 Flume should support exposing metrics via HTTP in JSON/some other web service format.
(Hari Shreedharan via Mubarak Seyed) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ce01ec1c Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ce01ec1c Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ce01ec1c Branch: refs/heads/cdh-1.2.0+24_intuit Commit: ce01ec1cf63eb0f29a4e53498a144104a08f5d3f Parents: 8d2755a Author: Mubarak Seyed <[email protected]> Authored: Tue Aug 14 01:25:11 2012 -0700 Committer: Mike Percy <[email protected]> Committed: Fri Sep 7 14:03:04 2012 -0700 ---------------------------------------------------------------------- .../org/apache/flume/channel/file/FileChannel.java | 1 + flume-ng-core/pom.xml | 5 + .../org/apache/flume/channel/MemoryChannel.java | 2 + .../flume/channel/PseudoTxnMemoryChannel.java | 32 ++++- .../flume/instrumentation/ChannelCounter.java | 14 ++- .../flume/instrumentation/ChannelCounterMBean.java | 11 ++ .../flume/instrumentation/GangliaServer.java | 70 +++----- .../instrumentation/MonitoredCounterGroup.java | 4 + .../flume/instrumentation/MonitoringType.java | 3 +- .../flume/instrumentation/SinkCounterMBean.java | 9 + .../flume/instrumentation/SourceCounterMBean.java | 10 +- .../instrumentation/http/HTTPMetricsServer.java | 128 ++++++++++++++ .../flume/instrumentation/util/JMXPollUtil.java | 84 ++++++++++ .../http/TestHTTPMetricsServer.java | 130 +++++++++++++++ .../flume/instrumentation/util/JMXTestUtils.java | 38 +++++ .../instrumentation/util/TestJMXPollUtil.java | 87 ++++++++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 50 ++++++ pom.xml | 7 + 18 files changed, 632 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index bd2558a..e7735e8 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -239,6 +239,7 @@ public class FileChannel extends BasicChannelSemantics { if (open) { channelCounter.start(); channelCounter.setChannelSize(getDepth()); + channelCounter.setChannelCapacity(capacity); } super.start(); } http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml index 5ed5461..f3b3240 100644 --- a/flume-ng-core/pom.xml +++ b/flume-ng-core/pom.xml @@ -165,6 +165,11 @@ limitations under the License. </dependency> <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </dependency> + + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java index 65b0166..c72e97c 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java @@ -247,6 +247,8 @@ public class MemoryChannel extends BasicChannelSemantics { public synchronized void start() { channelCounter.start(); channelCounter.setChannelSize(queue.size()); + channelCounter.setChannelCapacity(Long.valueOf( + queue.size() + queue.remainingCapacity())); super.start(); } http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java b/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java index 489d3e5..cc391c4 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java @@ -28,6 +28,7 @@ import org.apache.flume.Event; import org.apache.flume.Transaction; import com.google.common.base.Preconditions; +import org.apache.flume.instrumentation.ChannelCounter; /** * <p> @@ -83,6 +84,7 @@ public class PseudoTxnMemoryChannel extends AbstractChannel { private BlockingQueue<Event> queue; private Integer keepAlive; + private ChannelCounter channelCounter; @Override public void configure(Context context) { @@ -98,27 +100,51 @@ public class PseudoTxnMemoryChannel extends AbstractChannel { } queue = new ArrayBlockingQueue<Event>(capacity); + if(channelCounter == null) { + channelCounter = new ChannelCounter(getName()); + } + } + + @Override + public void start(){ + channelCounter.start(); + channelCounter.setChannelSize(queue.size()); + channelCounter.setChannelSize( + Long.valueOf(queue.size() + queue.remainingCapacity())); + super.start(); + } + + @Override + public void stop(){ + channelCounter.setChannelSize(queue.size()); + channelCounter.stop(); + super.stop(); } @Override public void put(Event event) { Preconditions.checkState(queue != null, "No queue defined (Did you forget to configure me?"); - + channelCounter.incrementEventPutAttemptCount(); try { queue.put(event); } catch (InterruptedException ex) { throw new ChannelException("Failed to put(" + event + ")", ex); } + channelCounter.addToEventPutSuccessCount(1); + channelCounter.setChannelSize(queue.size()); } @Override public Event take() { Preconditions.checkState(queue != null, "No queue defined (Did you forget to configure me?"); - + channelCounter.incrementEventTakeAttemptCount(); try { - return queue.poll(keepAlive, TimeUnit.SECONDS); + Event e = queue.poll(keepAlive, TimeUnit.SECONDS); + channelCounter.addToEventTakeSuccessCount(1); + channelCounter.setChannelSize(queue.size()); + return e; } catch (InterruptedException ex) { throw new ChannelException("Failed to take()", ex); } http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java index 316384a..602481e 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java @@ -35,10 +35,13 @@ public class ChannelCounter extends MonitoredCounterGroup implements private static final String COUNTER_EVENT_TAKE_SUCCESS = "channel.event.take.success"; + private static final String COUNTER_CHANNEL_CAPACITY = + "channel.capacity"; + private static final String[] ATTRIBUTES = { COUNTER_CHANNEL_SIZE, COUNTER_EVENT_PUT_ATTEMPT, COUNTER_EVENT_TAKE_ATTEMPT, COUNTER_EVENT_PUT_SUCCESS, - COUNTER_EVENT_TAKE_SUCCESS + COUNTER_EVENT_TAKE_SUCCESS, COUNTER_CHANNEL_CAPACITY }; public ChannelCounter(String name) { @@ -89,4 +92,13 @@ public class ChannelCounter extends MonitoredCounterGroup implements public long addToEventTakeSuccessCount(long delta) { return addAndGet(COUNTER_EVENT_TAKE_SUCCESS, delta); } + + public void setChannelCapacity(long capacity){ + set(COUNTER_CHANNEL_CAPACITY, capacity); + } + + public long getChannelCapacity(){ + return get(COUNTER_CHANNEL_CAPACITY); + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java index 799dd5d..f0c3ef3 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java @@ -18,6 +18,13 @@ */ package org.apache.flume.instrumentation; +/** + * This interface represents a channel counter mbean. Any class implementing + * this interface must sub-class + * {@linkplain org.apache.flume.instrumentation.MonitoredCounterGroup}. This + * interface might change between minor releases. Please see + * {@linkplain org.apache.flume.instrumentation.ChannelCounter} class. + */ public interface ChannelCounterMBean { long getChannelSize(); @@ -33,4 +40,8 @@ public interface ChannelCounterMBean { long getStartTime(); long getStopTime(); + + long getChannelCapacity(); + + String getType(); } http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java index d93cd33..8d34fee 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java @@ -28,6 +28,7 @@ import java.net.SocketAddress; import java.net.SocketException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -41,6 +42,7 @@ import org.apache.flume.Context; import org.apache.flume.FlumeException; import org.apache.flume.api.HostInfo; import org.apache.flume.conf.ConfigurationException; +import org.apache.flume.instrumentation.util.JMXPollUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,19 +51,16 @@ import org.slf4j.LoggerFactory; * once every 60 seconds). This implementation can send data to ganglia 3 and * ganglia 3.1. <p> * - * <b>Mandatory Parameters:</b><p> - * <tt>hosts: </tt> List of comma separated hostname:ports of ganglia - * servers to report metrics to. <p> - * <b>Optional Parameters: </b><p> - * <tt>pollFrequency:</tt>Interval in seconds between consecutive reports to - * ganglia servers. Default = 60 seconds.<p> + * <b>Mandatory Parameters:</b><p> <tt>hosts: </tt> List of comma separated + * hostname:ports of ganglia servers to report metrics to. <p> <b>Optional + * Parameters: </b><p> <tt>pollFrequency:</tt>Interval in seconds between + * consecutive reports to ganglia servers. Default = 60 seconds.<p> * <tt>isGanglia3:</tt> Report to ganglia 3 ? Default = false - reports to * ganglia 3.1. * * * */ - public class GangliaServer implements MonitorService { /* * The Ganglia protocol specific stuff: the xdr_* methods @@ -284,14 +283,13 @@ public class GangliaServer implements MonitorService { public void configure(Context context) { this.pollFrequency = context.getInteger(this.CONF_POLL_FREQUENCY, 60); String localHosts = context.getString(this.CONF_HOSTS); - if(localHosts == null || localHosts.isEmpty()){ + if (localHosts == null || localHosts.isEmpty()) { throw new ConfigurationException("Hosts list cannot be empty."); } this.hosts = this.getHostsFromString(localHosts); this.isGanglia3 = context.getBoolean(this.CONF_ISGANGLIA3, false); } - private List<HostInfo> getHostsFromString(String hosts) throws FlumeException { List<HostInfo> hostInfoList = new ArrayList<HostInfo>(); @@ -316,6 +314,7 @@ public class GangliaServer implements MonitorService { } return hostInfoList; } + /** * Worker which polls JMX for all mbeans with * {@link javax.management.ObjectName} within the flume namespace: @@ -332,47 +331,24 @@ public class GangliaServer implements MonitorService { @Override public void run() { try { - Set<ObjectInstance> queryMBeans = null; - try { - queryMBeans = mbeanServer.queryMBeans( - null, null); - } catch (Exception ex) { - logger.error("Could not get Mbeans for monitoring", ex); - Throwables.propagate(ex); - } - for (ObjectInstance obj : queryMBeans) { - try { - if (!obj.getObjectName().toString().startsWith("org.apache.flume")) { - continue; - } - MBeanAttributeInfo[] attrs = mbeanServer. - getMBeanInfo(obj.getObjectName()).getAttributes(); - String strAtts[] = new String[attrs.length]; - for (int i = 0; i < strAtts.length; i++) { - strAtts[i] = attrs[i].getName(); - } - AttributeList attrList = mbeanServer.getAttributes( - obj.getObjectName(), strAtts); - String component = obj.getObjectName().toString().substring( - obj.getObjectName().toString().indexOf('=') + 1); - for (Object attr : attrList) { - Attribute localAttr = (Attribute) attr; - if (isGanglia3) { - server.createGangliaMessage(GANGLIA_CONTEXT + component + "." - + localAttr.getName(), - localAttr.getValue().toString()); - } else { - server.createGangliaMessage31(GANGLIA_CONTEXT + component + "." - + localAttr.getName(), - localAttr.getValue().toString()); - } - server.sendToGangliaNodes(); + Map<String, Map<String, String>> metricsMap = + JMXPollUtil.getAllMBeans(); + for (String component : metricsMap.keySet()) { + Map<String, String> attributeMap = metricsMap.get(component); + for (String attribute : attributeMap.keySet()) { + if (isGanglia3) { + server.createGangliaMessage(GANGLIA_CONTEXT + component + "." + + attribute, + attributeMap.get(attribute)); + } else { + server.createGangliaMessage31(GANGLIA_CONTEXT + component + "." + + attribute, + attributeMap.get(attribute)); } - } catch (Exception ex) { - logger.error("Error getting mbean attributes", ex); + server.sendToGangliaNodes(); } } - } catch(Throwable t) { + } catch (Throwable t) { logger.error("Unexpected error", t); } } http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java index a03d004..6bc31ef 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java @@ -139,4 +139,8 @@ public abstract class MonitoredCounterGroup { SINK_PROCESSOR, SINK }; + + public String getType(){ + return type.name(); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java index d132995..443335c 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java @@ -24,7 +24,8 @@ package org.apache.flume.instrumentation; */ public enum MonitoringType { OTHER(null), - GANGLIA(org.apache.flume.instrumentation.GangliaServer.class); + GANGLIA(org.apache.flume.instrumentation.GangliaServer.class), + HTTP(org.apache.flume.instrumentation.http.HTTPMetricsServer.class); private Class<? extends MonitorService> monitoringClass; http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java index 6905d49..472a4dd 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java @@ -17,6 +17,13 @@ * under the License. */ package org.apache.flume.instrumentation; +/** + * This interface represents a sink counter mbean. Any class implementing + * this interface must sub-class + * {@linkplain org.apache.flume.instrumentation.MonitoredCounterGroup}. This + * interface might change between minor releases. Please see + * {@linkplain org.apache.flume.instrumentation.SinkCounter} class. + */ public interface SinkCounterMBean { @@ -39,4 +46,6 @@ public interface SinkCounterMBean { long getStartTime(); long getStopTime(); + + String getType(); } http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java index e6612d5..792e689 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java @@ -17,7 +17,13 @@ * under the License. */ package org.apache.flume.instrumentation; - +/** + * This interface represents a source counter mbean. Any class implementing + * this interface must sub-class + * {@linkplain org.apache.flume.instrumentation.MonitoredCounterGroup}. This + * interface might change between minor releases. Please see + * {@linkplain org.apache.flume.instrumentation.SourceCounter} class. + */ public interface SourceCounterMBean { long getEventReceivedCount(); @@ -35,4 +41,6 @@ public interface SourceCounterMBean { long getStartTime(); long getStopTime(); + + String getType(); } http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java new file mode 100644 index 0000000..373e344 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/http/HTTPMetricsServer.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.instrumentation.http; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.Map; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.flume.Context; +import org.apache.flume.instrumentation.MonitorService; +import org.apache.flume.instrumentation.util.JMXPollUtil; +import org.mortbay.jetty.Request; +import org.mortbay.jetty.Server; +import org.mortbay.jetty.handler.AbstractHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Monitor service implementation that runs a web server on a configurable + * port and returns the metrics for components in JSON format. <p> Optional + * parameters: <p> <tt>port</tt> : The port on which the server should listen + * to.<p> Returns metrics in the following format: <p> + * + * {<p> "componentName1":{"metric1" : "metricValue1","metric2":"metricValue2"} + * <p> "componentName1":{"metric3" : "metricValue3","metric4":"metricValue4"} + * <p> } + */ +public class HTTPMetricsServer implements MonitorService { + + private Server jettyServer; + private int port; + private static Logger LOG = LoggerFactory.getLogger(HTTPMetricsServer.class); + public static int DEFAULT_PORT = 41414; + public static String CONFIG_PORT = "port"; + + @Override + public void start() { + jettyServer = new Server(port); + //We can use Contexts etc if we have many urls to handle. For one url, + //specifying a handler directly is the most efficient. + jettyServer.setHandler(new HTTPMetricsHandler()); + try { + jettyServer.start(); + while (!jettyServer.isStarted()) { + Thread.sleep(500); + } + } catch (Exception ex) { + LOG.error("Error starting Jetty. JSON Metrics may not be available.", ex); + } + + } + + @Override + public void stop() { + try { + jettyServer.stop(); + jettyServer.join(); + } catch (Exception ex) { + LOG.error("Error stopping Jetty. JSON Metrics may not be available.", ex); + } + + } + + @Override + public void configure(Context context) { + port = context.getInteger(CONFIG_PORT, DEFAULT_PORT); + } + + private class HTTPMetricsHandler extends AbstractHandler { + + Type mapType = + new TypeToken<Map<String, Map<String, String>>>() { + }.getType(); + Gson gson = new Gson(); + + @Override + public void handle(String target, + HttpServletRequest request, + HttpServletResponse response, + int dispatch) throws IOException, ServletException { + // /metrics is the only place to pull metrics. + //If we want to use any other url for something else, we should make sure + //that for metrics only /metrics is used to prevent backward + //compatibility issues. + if (target.equals("/")) { + response.setContentType("text/html;charset=utf-8"); + response.setStatus(HttpServletResponse.SC_OK); + response.getWriter().write("For Flume metrics please click" + + " <a href = \"./metrics\"> here</a>."); + response.flushBuffer(); + ((Request) request).setHandled(true); + return; + } else if (target.equalsIgnoreCase("/metrics")) { + response.setContentType("application/json;charset=utf-8"); + response.setStatus(HttpServletResponse.SC_OK); + Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans(); + String json = gson.toJson(metricsMap, mapType); + response.getWriter().write(json); + response.flushBuffer(); + ((Request) request).setHandled(true); + return; + } + response.sendError(HttpServletResponse.SC_NOT_FOUND); + response.flushBuffer(); + //Not handling the request returns a Not found error page. + } + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java new file mode 100644 index 0000000..cbd6c35 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/util/JMXPollUtil.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.instrumentation.util; + +import com.google.common.base.Throwables; +import com.google.common.collect.Maps; +import java.lang.management.ManagementFactory; +import java.util.Map; +import java.util.Set; +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanServer; +import javax.management.ObjectInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JMXPollUtil { + + private static Logger LOG = LoggerFactory.getLogger(JMXPollUtil.class); + private static MBeanServer mbeanServer = ManagementFactory. + getPlatformMBeanServer(); + + public static Map<String, Map<String, String>> getAllMBeans() { + Map<String, Map<String, String>> mbeanMap = Maps.newHashMap(); + Set<ObjectInstance> queryMBeans = null; + try { + queryMBeans = mbeanServer.queryMBeans(null, null); + } catch (Exception ex) { + LOG.error("Could not get Mbeans for monitoring", ex); + Throwables.propagate(ex); + } + for (ObjectInstance obj : queryMBeans) { + try { + if (!obj.getObjectName().toString().startsWith("org.apache.flume")) { + continue; + } + MBeanAttributeInfo[] attrs = mbeanServer. + getMBeanInfo(obj.getObjectName()).getAttributes(); + String strAtts[] = new String[attrs.length]; + for (int i = 0; i < strAtts.length; i++) { + strAtts[i] = attrs[i].getName(); + } + AttributeList attrList = mbeanServer.getAttributes( + obj.getObjectName(), strAtts); + String component = obj.getObjectName().toString().substring( + obj.getObjectName().toString().indexOf('=') + 1); + Map<String, String> attrMap = Maps.newHashMap(); + + + for (Object attr : attrList) { + Attribute localAttr = (Attribute) attr; + if(localAttr.getName().equalsIgnoreCase("type")){ + component = localAttr.getValue()+ "." + component; + } + attrMap.put(localAttr.getName(), localAttr.getValue().toString()); + } + mbeanMap.put(component, attrMap); + } catch (Exception e) { + LOG.error("Unable to poll JMX for metrics.", e); + } + } + return mbeanMap; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java new file mode 100644 index 0000000..a2a1c30 --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.instrumentation.http; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Map; +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Transaction; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.channel.PseudoTxnMemoryChannel; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; +import org.apache.flume.instrumentation.MonitorService; +import org.apache.flume.instrumentation.util.JMXTestUtils; +import org.junit.Assert; +import org.junit.Test; + +/** + * + */ +public class TestHTTPMetricsServer { + + Channel memChannel = new MemoryChannel(); + Channel pmemChannel = new PseudoTxnMemoryChannel(); + Type mapType = + new TypeToken<Map<String, Map<String, String>>>() { + }.getType(); + Gson gson = new Gson(); + + @Test + public void testJSON() throws Exception { + memChannel.setName("memChannel"); + pmemChannel.setName("pmemChannel"); + Context c = new Context(); + Configurables.configure(memChannel, c); + Configurables.configure(pmemChannel, c); + memChannel.start(); + pmemChannel.start(); + Transaction txn = memChannel.getTransaction(); + txn.begin(); + memChannel.put(EventBuilder.withBody("blah".getBytes())); + memChannel.put(EventBuilder.withBody("blah".getBytes())); + txn.commit(); + txn.close(); + + txn = memChannel.getTransaction(); + txn.begin(); + memChannel.take(); + txn.commit(); + txn.close(); + + + Transaction txn2 = pmemChannel.getTransaction(); + txn2.begin(); + pmemChannel.put(EventBuilder.withBody("blah".getBytes())); + pmemChannel.put(EventBuilder.withBody("blah".getBytes())); + txn2.commit(); + txn2.close(); + + txn2 = pmemChannel.getTransaction(); + txn2.begin(); + pmemChannel.take(); + txn2.commit(); + txn2.close(); + + testWithPort(5467); + testWithPort(33434); + testWithPort(44343); + testWithPort(0); + memChannel.stop(); + pmemChannel.stop(); + } + + private void testWithPort(int port) throws Exception { + MonitorService srv = new HTTPMetricsServer(); + Context context = new Context(); + if(port > 1024){ + context.put(HTTPMetricsServer.CONFIG_PORT, String.valueOf(port)); + } else { + port = HTTPMetricsServer.DEFAULT_PORT; + } + srv.configure(context); + srv.start(); + Thread.sleep(1000); + URL url = new URL("http://0.0.0.0:" + String.valueOf(port) + "/metrics"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("GET"); + BufferedReader reader = new BufferedReader( + new InputStreamReader(conn.getInputStream())); + String line; + String result = ""; + while ((line = reader.readLine()) != null) { + result += line; + } + reader.close(); + Map<String, Map<String, String>> mbeans = gson.fromJson(result, mapType); + Assert.assertNotNull(mbeans); + Map<String, String> memBean = mbeans.get("CHANNEL.memChannel"); + Assert.assertNotNull(memBean); + JMXTestUtils.checkChannelCounterParams(memBean); + Map<String, String> pmemBean = mbeans.get("CHANNEL.pmemChannel"); + Assert.assertNotNull(pmemBean); + JMXTestUtils.checkChannelCounterParams(pmemBean); + srv.stop(); + System.out.println(String.valueOf(port) + "test success!"); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/test/java/org/apache/flume/instrumentation/util/JMXTestUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/util/JMXTestUtils.java b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/util/JMXTestUtils.java new file mode 100644 index 0000000..a392e0c --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/util/JMXTestUtils.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.instrumentation.util; + +import java.util.Map; +import org.junit.Assert; + +/** + * + */ +public class JMXTestUtils { + + public static void checkChannelCounterParams(Map<String, String> attrs) { + Assert.assertNotNull(attrs.get("StartTime")); + Assert.assertNotNull(attrs.get("StopTime")); + Assert.assertTrue(Long.parseLong(attrs.get("ChannelSize")) != 0); + Assert.assertTrue(Long.parseLong(attrs.get("EventPutAttemptCount")) == 2); + Assert.assertTrue(Long.parseLong(attrs.get("EventTakeAttemptCount")) == 1); + Assert.assertTrue(Long.parseLong(attrs.get("EventPutSuccessCount")) == 2); + Assert.assertTrue(Long.parseLong(attrs.get("EventTakeSuccessCount")) == 1); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-core/src/test/java/org/apache/flume/instrumentation/util/TestJMXPollUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/util/TestJMXPollUtil.java b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/util/TestJMXPollUtil.java new file mode 100644 index 0000000..71340f5 --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/util/TestJMXPollUtil.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.instrumentation.util; + +import java.util.Map; +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Transaction; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.channel.PseudoTxnMemoryChannel; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; +import org.junit.Assert; +import org.junit.Test; + +/** + * + */ +public class TestJMXPollUtil { + + Channel memChannel = new MemoryChannel(); + Channel pmemChannel = new PseudoTxnMemoryChannel(); + + @Test + public void testJMXPoll() { + memChannel.setName("memChannel"); + pmemChannel.setName("pmemChannel"); + Context c = new Context(); + Configurables.configure(memChannel, c); + Configurables.configure(pmemChannel, c); + memChannel.start(); + pmemChannel.start(); + Transaction txn = memChannel.getTransaction(); + txn.begin(); + memChannel.put(EventBuilder.withBody("blah".getBytes())); + memChannel.put(EventBuilder.withBody("blah".getBytes())); + txn.commit(); + txn.close(); + + txn = memChannel.getTransaction(); + txn.begin(); + memChannel.take(); + txn.commit(); + txn.close(); + + + Transaction txn2 = pmemChannel.getTransaction(); + txn2.begin(); + pmemChannel.put(EventBuilder.withBody("blah".getBytes())); + pmemChannel.put(EventBuilder.withBody("blah".getBytes())); + txn2.commit(); + txn2.close(); + + txn2 = pmemChannel.getTransaction(); + txn2.begin(); + pmemChannel.take(); + txn2.commit(); + txn2.close(); + + Map<String, Map<String, String>> mbeans = JMXPollUtil.getAllMBeans(); + Assert.assertNotNull(mbeans); + Map<String, String> memBean = mbeans.get("CHANNEL.memChannel"); + Assert.assertNotNull(memBean); + JMXTestUtils.checkChannelCounterParams(memBean); + Map<String, String> pmemBean = mbeans.get("CHANNEL.pmemChannel"); + Assert.assertNotNull(pmemBean); + JMXTestUtils.checkChannelCounterParams(pmemBean); + memChannel.stop(); + pmemChannel.stop(); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 40e385f..94212ec 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1842,6 +1842,56 @@ starts with ``org.apache.flume``): ManagementFactory.getPlatformMBeanServer().registerMBean(this, objName); +JSON Reporting +-------------- +Flume can also report metrics in a JSON format. To enable reporting in JSON format, Flume hosts +a Web server on a configurable port. Flume reports metrics in the following JSON format: + +.. code-block:: java + + { + "typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"}, + "typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"} + } + +Here is an example: + +.. code-block:: java + + { + "CHANNEL.fileChannel":{"EventPutSuccessCount":"468085", + "Type":"CHANNEL", + "StopTime":"0", + "EventPutAttemptCount":"468086", + "ChannelSize":"233428", + "StartTime":"1344882233070", + "EventTakeSuccessCount":"458200", + "ChannelCapacity":"600000", + "EventTakeAttemptCount":"458288"}, + "CHANNEL.memChannel":{"EventPutSuccessCount":"22948908", + "Type":"CHANNEL", + "StopTime":"0", + "EventPutAttemptCount":"22948908", + "ChannelSize":"5", + "StartTime":"1344882209413", + "EventTakeSuccessCount":"22948900", + "ChannelCapacity":"100", + "EventTakeAttemptCount":"22948908"} + } + +======================= ======= ===================================================================================== +Property Name Default Description +======================= ======= ===================================================================================== +**type** -- The component type name, has to be ``HTTP`` +port 41414 The port to start the server on. +======================= ======= ===================================================================================== + +We can start Flume with Ganglia support as follows:: + + $ bin/flume-ng agent --conf-file example.conf --name agent1 -Dflume.monitoring.type=HTTP -Dflume.monitoring.port=34545 + +Metrics will then be available at **http://<hostname>:<port>/metrics** webpage. +Custom components can report metrics as mentioned in the Ganglia section above. Custom Reporting ---------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ce01ec1c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7ac32d8..d6c813a 100644 --- a/pom.xml +++ b/pom.xml @@ -704,6 +704,13 @@ limitations under the License. <version>2.5-20110124</version> </dependency> + <!-- Gson: Java to Json conversion --> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.2.2</version> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId>
