Repository: ambari Updated Branches: refs/heads/trunk 15dbe3578 -> 8d27ac2b7
AMBARI-9185. Add Kafka metric sink implementation to enable sink to AMS. Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8d27ac2b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8d27ac2b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8d27ac2b Branch: refs/heads/trunk Commit: 8d27ac2b77abdeeb61fd8d9bc6b51ce113fcd900 Parents: 15dbe35 Author: Siddharth Wagle <[email protected]> Authored: Fri Jan 16 12:37:46 2015 -0800 Committer: Siddharth Wagle <[email protected]> Committed: Fri Jan 16 12:37:46 2015 -0800 ---------------------------------------------------------------------- ambari-metrics/ambari-metrics-assembly/pom.xml | 47 +- .../src/main/assembly/sink-windows.xml | 8 + .../src/main/assembly/sink.xml | 8 + .../src/main/package/rpm/sink/postinstall.sh | 11 +- .../ambari-metrics-kafka-sink/pom.xml | 163 +++++++ .../src/main/assemblies/empty.xml | 21 + .../src/main/assemblies/jar-with-common.xml | 34 ++ .../kafka/KafkaTimelineMetricsReporter.java | 448 +++++++++++++++++++ .../KafkaTimelineMetricsReporterMBean.java | 25 ++ .../metrics2/sink/kafka/ScheduledReporter.java | 218 +++++++++ .../kafka/KafkaTimelineMetricsReporterTest.java | 109 +++++ .../sink/kafka/ScheduledReporterTest.java | 105 +++++ ambari-metrics/pom.xml | 7 +- .../0.8.1.2.2/configuration/kafka-broker.xml | 31 +- .../KAFKA/0.8.1.2.2/configuration/kafka-env.xml | 6 + .../KAFKA/0.8.1.2.2/package/scripts/kafka.py | 3 + .../KAFKA/0.8.1.2.2/package/scripts/params.py | 27 ++ 17 files changed, 1259 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/8d27ac2b/ambari-metrics/ambari-metrics-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-assembly/pom.xml b/ambari-metrics/ambari-metrics-assembly/pom.xml index 1e1ca8c..60ba30b 100644 --- a/ambari-metrics/ambari-metrics-assembly/pom.xml +++ b/ambari-metrics/ambari-metrics-assembly/pom.xml @@ -36,6 +36,7 @@ <hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir> <storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir> <flume-sink.dir>${project.basedir}/../ambari-metrics-flume-sink</flume-sink.dir> + <kafka-sink.dir>${project.basedir}/../ambari-metrics-kafka-sink</kafka-sink.dir> <python.ver>python >= 2.6</python.ver> <deb.python.ver>python (>= 2.6)</deb.python.ver> <deb.architecture>amd64</deb.architecture> @@ -43,6 +44,7 @@ <hadoop.sink.jar>ambari-metrics-hadoop-sink-with-common-${project.version}.jar</hadoop.sink.jar> <storm.sink.jar>ambari-metrics-storm-sink-with-common-${project.version}.jar</storm.sink.jar> <flume.sink.jar>ambari-metrics-flume-sink-with-common-${project.version}.jar</flume.sink.jar> + <kafka.sink.jar>ambari-metrics-kafka-sink-with-common-${project.version}.jar</kafka.sink.jar> </properties> <build> @@ -447,7 +449,6 @@ <location>${hadoop-sink.dir}/target/ambari-metrics-hadoop-sink-with-common-${project.version}.jar</location> </source> </sources> - </mapping> <mapping> <directory>/usr/lib/flume/lib</directory> @@ -465,6 +466,22 @@ </source> </sources> </mapping> + <mapping> + <directory>/usr/lib/ambari-metrics-kafka-sink</directory> + <sources> + <source> + <location>${kafka-sink.dir}/target/${kafka.sink.jar}</location> + </source> + </sources> + </mapping> + <mapping> + <directory>/usr/lib/ambari-metrics-kafka-sink/lib</directory> + <sources> + <source> + <location>${kafka-sink.dir}/target/lib</location> + </source> + </sources> + </mapping> </mappings> </configuration> @@ -595,6 +612,7 @@ <path>/var/log/ambari-metrics-collector</path> <path>/var/lib/ambari-metrics-collector</path> <path>/usr/lib/ambari-metrics-hadoop-sink</path> + <path>/usr/lib/ambari-metrics-kafka-sink</path> <path>/usr/lib/flume/lib</path> <path>/usr/lib/storm/lib</path> </paths> @@ -771,6 +789,28 @@ </mapper> </data> + <!-- kafka sink --> + + <data> + <src>${kafka-sink.dir}/target/${kafka.sink.jar}</src> + <type>file</type> + <mapper> + <type>perm</type> + <filemode>644</filemode> + <dirmode>755</dirmode> + <prefix>/usr/lib/ambari-metrics-kafka-sink</prefix> + </mapper> + </data> + <data> + <src>${kafka-sink.dir}/target/lib</src> + <type>file</type> + <mapper> + <type>perm</type> + <filemode>644</filemode> + <dirmode>755</dirmode> + <prefix>/usr/lib/ambari-metrics-kafka-sink/lib</prefix> + </mapper> + </data> </dataSet> </configuration> </plugin> @@ -1063,6 +1103,11 @@ </dependency> <dependency> <groupId>org.apache.ambari</groupId> + <artifactId>ambari-metrics-kafka-sink</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.ambari</groupId> <artifactId>ambari-metrics-host-monitoring</artifactId> <version>${project.version}</version> <type>pom</type> http://git-wip-us.apache.org/repos/asf/ambari/blob/8d27ac2b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml index 0a36fac..38a8093 100644 --- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml +++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml @@ -37,6 +37,10 @@ <directory>${storm-sink.dir}/src/main/conf</directory> <outputDirectory>hadoop-sink/conf</outputDirectory> </fileSet> + <fileSet> + <directory>${kafka-sink.dir}/target/lib</directory> + <outputDirectory>hadoop-sink/lib</outputDirectory> + </fileSet> </fileSets> <files> @@ -53,6 +57,10 @@ <outputDirectory>hadoop-sink</outputDirectory> </file> <file> + <source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source> + <outputDirectory>hadoop-sink</outputDirectory> + </file> + <file> <source>${basedir}/src/main/package/msi/sink.wxs</source> <outputDirectory>../../</outputDirectory> <filtered>true</filtered> http://git-wip-us.apache.org/repos/asf/ambari/blob/8d27ac2b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml index 2426904..4a3b7c5 100644 --- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml +++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml @@ -38,6 +38,10 @@ <directory>${storm-sink.dir}/src/main/conf</directory> <outputDirectory>hadoop-sink/conf</outputDirectory> </fileSet> + <fileSet> + <directory>${kafka-sink.dir}/target/lib</directory> + <outputDirectory>hadoop-sink/lib</outputDirectory> + </fileSet> </fileSets> <files> @@ -53,6 +57,10 @@ <source>${storm-sink.dir}/target/ambari-metrics-storm-sink-with-common-${project.version}.jar</source> <outputDirectory>hadoop-sink</outputDirectory> </file> + <file> + <source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source> + <outputDirectory>hadoop-sink</outputDirectory> + </file> </files> http://git-wip-us.apache.org/repos/asf/ambari/blob/8d27ac2b/ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh b/ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh index 1955680..e87b9f0 100644 --- a/ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh +++ b/ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh @@ -15,15 +15,20 @@ # limitations under the License HADOOP_LINK_NAME="/usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar" -FLUME_LINK_NAME="/usr/lib/flume/lib/ambari-metrics-flume-sink.jar" HADOOP_SINK_JAR="/usr/lib/ambari-metrics-hadoop-sink/${hadoop.sink.jar}" + +FLUME_LINK_NAME="/usr/lib/flume/lib/ambari-metrics-flume-sink.jar" FLUME_SINK_JAR="/usr/lib/flume/lib/${flume.sink.jar}" + +KAFKA_LINK_NAME="/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar" +KAFKA_SINK_JAR="/usr/lib/ambari-metrics-kafka-sink/${kafka.sink.jar}" + #link for storm jar not required with current loading #STORM_SINK_JAR="/usr/lib/storm/lib/${storm.sink.jar}" #STORM_LINK_NAME="/usr/lib/storm/lib/ambari-metrics-storm-sink.jar" -JARS=(${HADOOP_SINK_JAR} ${FLUME_SINK_JAR}) -LINKS=(${HADOOP_LINK_NAME} ${FLUME_LINK_NAME}) +JARS=(${HADOOP_SINK_JAR} ${FLUME_SINK_JAR} ${KAFKA_SINK_JAR}) +LINKS=(${HADOOP_LINK_NAME} ${FLUME_LINK_NAME} ${KAFKA_LINK_NAME}) for index in ${!LINKS[*]} do http://git-wip-us.apache.org/repos/asf/ambari/blob/8d27ac2b/ambari-metrics/ambari-metrics-kafka-sink/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/pom.xml b/ambari-metrics/ambari-metrics-kafka-sink/pom.xml new file mode 100644 index 0000000..55c6f07 --- /dev/null +++ b/ambari-metrics/ambari-metrics-kafka-sink/pom.xml @@ -0,0 +1,163 @@ +<?<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>ambari-metrics</artifactId> + <groupId>org.apache.ambari</groupId> + <version>0.1.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>ambari-metrics-kafka-sink</artifactId> + <version>0.1.0-SNAPSHOT</version> + <packaging>jar</packaging> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.9</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <includeArtifactIds>commons-codec,commons-collections,commons-httpclient,commons-lang,commons-logging,guava,jackson-core-asl,jackson-mapper-asl,jackson-xc,hadoop-common</includeArtifactIds> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <configuration> + <descriptors> + <descriptor>src/main/assemblies/jar-with-common.xml</descriptor> + </descriptors> + <attach>false</attach> + <tarLongFileMode>gnu</tarLongFileMode> + <appendAssemblyId>false</appendAssemblyId> + <finalName>${project.artifactId}-with-common-${project.version}</finalName> + </configuration> + <id>build-jar</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.0</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.8</version> + <executions> + <execution> + <id>parse-version</id> + <phase>validate</phase> + <goals> + <goal>parse-version</goal> + </goals> + </execution> + <execution> + <id>regex-property</id> + <goals> + <goal>regex-property</goal> + </goals> + <configuration> + <name>ambariVersion</name> + <value>${project.version}</value> + <regex>^([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex> + <replacement>$1.$2.$3</replacement> + <failIfNoMatch>false</failIfNoMatch> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>com.github.goldin</groupId> + <artifactId>copy-maven-plugin</artifactId> + <version>0.2.5</version> + <executions> + <execution> + <id>create-archive</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + </plugins> + </build> + <dependencies> + <dependency> + <groupId>org.apache.ambari</groupId> + <artifactId>ambari-metrics-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>0.8.1.1</version> + </dependency> + <dependency> + <groupId>com.yammer.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>2.2.0</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + <version>4.10</version> + </dependency> + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> + <version>3.2</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-easymock</artifactId> + <version>1.4.9</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <version>1.4.9</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.9.5</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/ambari/blob/8d27ac2b/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/empty.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/empty.xml b/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/empty.xml new file mode 100644 index 0000000..17ff68a --- /dev/null +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/empty.xml @@ -0,0 +1,21 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<assembly> + <id>empty</id> + <formats/> +</assembly> http://git-wip-us.apache.org/repos/asf/ambari/blob/8d27ac2b/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/jar-with-common.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/jar-with-common.xml b/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/jar-with-common.xml new file mode 100644 index 0000000..4fa8585 --- /dev/null +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/jar-with-common.xml @@ -0,0 +1,34 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<assembly> + <id>jar-with-common</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <dependencySets> + <dependencySet> + <outputDirectory>/</outputDirectory> + <unpack>true</unpack> + <includes> + <include>org.apache.ambari:ambari-metrics-common</include> + <include>org.apache.ambari:ambari-metrics-kafka-sink</include> + </includes> + </dependencySet> + </dependencySets> +</assembly> http://git-wip-us.apache.org/repos/asf/ambari/blob/8d27ac2b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java new file mode 100644 index 0000000..762b5f2 --- /dev/null +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java @@ -0,0 +1,448 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.kafka; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import kafka.metrics.KafkaMetricsConfig; +import kafka.metrics.KafkaMetricsReporter; +import kafka.utils.VerifiableProperties; + +import org.apache.commons.lang.ClassUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; +import org.apache.hadoop.metrics2.util.Servers; + +import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.Counter; +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.core.Metered; +import com.yammer.metrics.core.Metric; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.MetricProcessor; +import com.yammer.metrics.core.MetricsRegistry; +import com.yammer.metrics.core.Timer; +import com.yammer.metrics.stats.Snapshot; + +public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink implements KafkaMetricsReporter, + KafkaTimelineMetricsReporterMBean { + + private final static Log LOG = LogFactory.getLog(KafkaTimelineMetricsReporter.class); + + private static final String TIMELINE_METRICS_SEND_INTERVAL_PROPERTY = "kafka.timeline.metrics.sendInterval"; + private static final String TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY = "kafka.timeline.metrics.maxRowCacheSize"; + private static final String TIMELINE_HOST_PROPERTY = "kafka.timeline.metrics.host"; + private static final String TIMELINE_PORT_PROPERTY = "kafka.timeline.metrics.port"; + private static final String TIMELINE_REPORTER_ENABLED_PROPERTY = "kafka.timeline.metrics.reporter.enabled"; + private static final String TIMELINE_DEFAULT_HOST = "localhost"; + private static final String TIMELINE_DEFAULT_PORT = "8188"; + + private boolean initialized = false; + private boolean running = false; + private Object lock = new Object(); + private String collectorUri; + private String hostname; + private SocketAddress socketAddress; + private TimelineScheduledReporter reporter; + private TimelineMetricsCache metricsCache; + + @Override + protected SocketAddress getServerSocketAddress() { + return socketAddress; + } + + @Override + protected String getCollectorUri() { + return collectorUri; + } + + public void setMetricsCache(TimelineMetricsCache metricsCache) { + this.metricsCache = metricsCache; + } + + public void init(VerifiableProperties props) { + synchronized (lock) { + if (!initialized) { + LOG.info("Initializing Kafka Timeline Metrics Sink"); + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.error("Could not identify hostname."); + throw new RuntimeException("Could not identify hostname.", e); + } + KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props); + int metricsSendInterval = Integer.parseInt(props.getString(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY, + String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS))); + int maxRowCacheSize = Integer.parseInt(props.getString(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY, + String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT))); + String metricCollectorHost = props.getString(TIMELINE_HOST_PROPERTY, TIMELINE_DEFAULT_HOST); + String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT); + setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval)); + collectorUri = "http://" + metricCollectorHost + ":" + metricCollectorPort + "/ws/v1/timeline/metrics"; + List<InetSocketAddress> socketAddresses = Servers.parse(metricCollectorHost, + Integer.parseInt(metricCollectorPort)); + if (socketAddresses != null && !socketAddresses.isEmpty()) { + socketAddress = socketAddresses.get(0); + } + initializeReporter(); + if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) { + startReporter(metricsConfig.pollingIntervalSecs()); + } + if (LOG.isTraceEnabled()) { + LOG.trace("CollectorUri = " + collectorUri); + LOG.trace("SocketAddress = " + socketAddress); + LOG.trace("MetricsSendInterval = " + metricsSendInterval); + LOG.trace("MaxRowCacheSize = " + maxRowCacheSize); + } + } + } + } + + public String getMBeanName() { + return "kafka:type=org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter"; + } + + public synchronized void startReporter(long period) { + synchronized (lock) { + if (initialized && !running) { + reporter.start(period, TimeUnit.SECONDS); + running = true; + LOG.info(String.format("Started Kafka Timeline metrics reporter with polling period %d seconds", period)); + } + } + } + + public synchronized void stopReporter() { + synchronized (lock) { + if (initialized && running) { + reporter.stop(); + running = false; + LOG.info("Stopped Kafka Timeline metrics reporter"); + initializeReporter(); + } + } + } + + private void initializeReporter() { + reporter = new TimelineScheduledReporter(Metrics.defaultRegistry(), "timeline-scheduled-reporter", + TimeUnit.SECONDS, TimeUnit.MILLISECONDS); + initialized = true; + } + + interface Context { + public List<TimelineMetric> getTimelineMetricList(); + } + + class TimelineScheduledReporter extends ScheduledReporter implements MetricProcessor<Context> { + + private static final String APP_ID = "kafka_broker"; + private static final String COUNT_SUFIX = ".count"; + private static final String ONE_MINUTE_RATE_SUFIX = ".1MinuteRate"; + private static final String MEAN_SUFIX = ".mean"; + private static final String MEAN_RATE_SUFIX = ".meanRate"; + private static final String FIVE_MINUTE_RATE_SUFIX = ".5MinuteRate"; + private static final String FIFTEEN_MINUTE_RATE_SUFIX = ".15MinuteRate"; + private static final String MIN_SUFIX = ".min"; + private static final String MAX_SUFIX = ".max"; + private static final String MEDIAN_SUFIX = ".median"; + private static final String STD_DEV_SUFIX = "stddev"; + private static final String SEVENTY_FIFTH_PERCENTILE_SUFIX = ".75percentile"; + private static final String NINETY_FIFTH_PERCENTILE_SUFIX = ".95percentile"; + private static final String NINETY_EIGHTH_PERCENTILE_SUFIX = ".98percentile"; + private static final String NINETY_NINTH_PERCENTILE_SUFIX = ".99percentile"; + private static final String NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX = ".999percentile"; + + protected TimelineScheduledReporter(MetricsRegistry registry, String name, TimeUnit rateUnit, TimeUnit durationUnit) { + super(registry, name, rateUnit, durationUnit); + } + + @Override + public void report(Set<Entry<MetricName, Metric>> metrics) { + final List<TimelineMetric> metricsList = new ArrayList<TimelineMetric>(); + try { + for (Entry<MetricName, Metric> entry : metrics) { + final MetricName metricName = entry.getKey(); + final Metric metric = entry.getValue(); + Context context = new Context() { + + public List<TimelineMetric> getTimelineMetricList() { + return metricsList; + } + + }; + metric.processWith(this, metricName, context); + } + } catch (Throwable t) { + LOG.error("Exception processing Kafka metric", t); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Metrics List size: " + metricsList.size()); + LOG.trace("Metics Set size: " + metrics.size()); + } + if (!metricsList.isEmpty()) { + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.setMetrics(metricsList); + try { + emitMetrics(timelineMetrics); + } catch (IOException e) { + LOG.error("Unexpected error", e); + } catch (Throwable t) { + LOG.error("Exception emitting metrics", t); + } + } + } + + private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, + Number attributeValue) { + if (LOG.isTraceEnabled()) { + LOG.trace("Creating timeline metric: " + attributeName + " = " + attributeValue + " time = " + + currentTimeMillis + " app_id = " + component); + } + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(attributeName); + timelineMetric.setHostName(hostname); + timelineMetric.setAppId(component); + timelineMetric.setStartTime(currentTimeMillis); + timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number")); + timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue.doubleValue()); + return timelineMetric; + } + + @Override + public void processMeter(MetricName name, Metered meter, Context context) throws Exception { + final long currentTimeMillis = System.currentTimeMillis(); + final String sanitizedName = sanitizeName(name); + final String meterCountName = sanitizedName + COUNT_SUFIX; + final TimelineMetric countMetric = createTimelineMetric(currentTimeMillis, APP_ID, meterCountName, meter.count()); + + final String meterOneMinuteRateName = sanitizedName + ONE_MINUTE_RATE_SUFIX; + final TimelineMetric oneMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID, + meterOneMinuteRateName, meter.oneMinuteRate()); + + final String meterMeanRateName = sanitizedName + MEAN_RATE_SUFIX; + final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, meterMeanRateName, + meter.meanRate()); + + final String meterFiveMinuteRateName = sanitizedName + FIVE_MINUTE_RATE_SUFIX; + final TimelineMetric fiveMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID, + meterFiveMinuteRateName, meter.fiveMinuteRate()); + + final String meterFifteenMinuteRateName = sanitizedName + FIFTEEN_MINUTE_RATE_SUFIX; + final TimelineMetric fifteenMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID, + meterFifteenMinuteRateName, meter.fifteenMinuteRate()); + + metricsCache.putTimelineMetric(countMetric); + metricsCache.putTimelineMetric(oneMinuteRateMetric); + metricsCache.putTimelineMetric(meanMetric); + metricsCache.putTimelineMetric(fiveMinuteRateMetric); + metricsCache.putTimelineMetric(fifteenMinuteRateMetric); + + String[] metricNames = new String[] { meterCountName, meterOneMinuteRateName, meterMeanRateName, + meterFiveMinuteRateName, meterFifteenMinuteRateName }; + populateMetricsList(context, metricNames); + } + + @Override + public void processCounter(MetricName name, Counter counter, Context context) throws Exception { + final long currentTimeMillis = System.currentTimeMillis(); + final String sanitizedName = sanitizeName(name); + final String metricCountName = sanitizedName + COUNT_SUFIX; + final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, metricCountName, counter.count()); + metricsCache.putTimelineMetric(metric); + populateMetricsList(context, metricCountName); + } + + @Override + public void processHistogram(MetricName name, Histogram histogram, Context context) throws Exception { + final long currentTimeMillis = System.currentTimeMillis(); + final Snapshot snapshot = histogram.getSnapshot(); + final String sanitizedName = sanitizeName(name); + + final String histogramMinName = sanitizedName + MIN_SUFIX; + final TimelineMetric minMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMinName, + histogram.min()); + + final String histogramMaxName = sanitizedName + MAX_SUFIX; + final TimelineMetric maxMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMaxName, + histogram.max()); + + final String histogramMeanName = sanitizedName + MEAN_SUFIX; + final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMeanName, + histogram.mean()); + + final String histogramMedianName = sanitizedName + MEDIAN_SUFIX; + final TimelineMetric medianMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMedianName, + snapshot.getMedian()); + + final String histogramStdDevName = sanitizedName + STD_DEV_SUFIX; + final TimelineMetric stdDevMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramStdDevName, + histogram.stdDev()); + + final String histogramSeventyFifthPercentileName = sanitizedName + SEVENTY_FIFTH_PERCENTILE_SUFIX; + final TimelineMetric seventyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, + histogramSeventyFifthPercentileName, snapshot.get75thPercentile()); + + final String histogramNinetyFifthPercentileName = sanitizedName + NINETY_FIFTH_PERCENTILE_SUFIX; + final TimelineMetric nintyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, + histogramNinetyFifthPercentileName, snapshot.get95thPercentile()); + + final String histogramNinetyEighthPercentileName = sanitizedName + NINETY_EIGHTH_PERCENTILE_SUFIX; + final TimelineMetric nintyEighthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, + histogramNinetyEighthPercentileName, snapshot.get98thPercentile()); + + final String histogramNinetyNinethPercentileName = sanitizedName + NINETY_NINTH_PERCENTILE_SUFIX; + final TimelineMetric nintyNinthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, + histogramNinetyNinethPercentileName, snapshot.get99thPercentile()); + + final String histogramNinetyNinePointNinePercentileName = sanitizedName + NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX; + final TimelineMetric nintyNinthPointNinePercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, + histogramNinetyNinePointNinePercentileName, snapshot.get999thPercentile()); + + metricsCache.putTimelineMetric(minMetric); + metricsCache.putTimelineMetric(maxMetric); + metricsCache.putTimelineMetric(meanMetric); + metricsCache.putTimelineMetric(medianMetric); + metricsCache.putTimelineMetric(stdDevMetric); + metricsCache.putTimelineMetric(seventyFifthPercentileMetric); + metricsCache.putTimelineMetric(nintyFifthPercentileMetric); + metricsCache.putTimelineMetric(nintyEighthPercentileMetric); + metricsCache.putTimelineMetric(nintyNinthPercentileMetric); + metricsCache.putTimelineMetric(nintyNinthPointNinePercentileMetric); + + String[] metricNames = new String[] { histogramMaxName, histogramMeanName, histogramMedianName, histogramMinName, + histogramNinetyEighthPercentileName, histogramNinetyFifthPercentileName, + histogramNinetyNinePointNinePercentileName, histogramNinetyNinethPercentileName, + histogramSeventyFifthPercentileName, histogramStdDevName }; + populateMetricsList(context, metricNames); + } + + @Override + public void processTimer(MetricName name, Timer timer, Context context) throws Exception { + final long currentTimeMillis = System.currentTimeMillis(); + final Snapshot snapshot = timer.getSnapshot(); + final String sanitizedName = sanitizeName(name); + + final String timerMinName = sanitizedName + MIN_SUFIX; + final TimelineMetric minMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMinName, timer.min()); + + final String timerMaxName = sanitizedName + MAX_SUFIX; + final TimelineMetric maxMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMaxName, timer.max()); + + final String timerMeanName = sanitizedName + MEAN_SUFIX; + final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMeanName, timer.mean()); + + final String timerMedianName = sanitizedName + MEDIAN_SUFIX; + final TimelineMetric medianMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMedianName, + snapshot.getMedian()); + + final String timerStdDevName = sanitizedName + STD_DEV_SUFIX; + final TimelineMetric stdDevMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerStdDevName, + timer.stdDev()); + + final String timerSeventyFifthPercentileName = sanitizedName + SEVENTY_FIFTH_PERCENTILE_SUFIX; + final TimelineMetric seventyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, + timerSeventyFifthPercentileName, snapshot.get75thPercentile()); + + final String timerNinetyFifthPercentileName = sanitizedName + NINETY_FIFTH_PERCENTILE_SUFIX; + final TimelineMetric nintyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, + timerNinetyFifthPercentileName, snapshot.get95thPercentile()); + + final String timerNinetyEighthPercentileName = sanitizedName + NINETY_EIGHTH_PERCENTILE_SUFIX; + final TimelineMetric nintyEighthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, + timerNinetyEighthPercentileName, snapshot.get98thPercentile()); + + final String timerNinetyNinthPercentileName = sanitizedName + NINETY_NINTH_PERCENTILE_SUFIX; + final TimelineMetric nintyNinthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, + timerNinetyNinthPercentileName, snapshot.get99thPercentile()); + + final String timerNinetyNinePointNinePercentileName = sanitizedName + NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX; + final TimelineMetric nintyNinthPointNinePercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, + timerNinetyNinePointNinePercentileName, snapshot.get999thPercentile()); + + metricsCache.putTimelineMetric(minMetric); + metricsCache.putTimelineMetric(maxMetric); + metricsCache.putTimelineMetric(meanMetric); + metricsCache.putTimelineMetric(medianMetric); + metricsCache.putTimelineMetric(stdDevMetric); + metricsCache.putTimelineMetric(seventyFifthPercentileMetric); + metricsCache.putTimelineMetric(nintyFifthPercentileMetric); + metricsCache.putTimelineMetric(nintyEighthPercentileMetric); + metricsCache.putTimelineMetric(nintyNinthPercentileMetric); + metricsCache.putTimelineMetric(nintyNinthPointNinePercentileMetric); + + String[] metricNames = new String[] { timerMaxName, timerMeanName, timerMedianName, timerMinName, + timerNinetyEighthPercentileName, timerNinetyFifthPercentileName, timerNinetyNinePointNinePercentileName, + timerNinetyNinthPercentileName, timerSeventyFifthPercentileName, timerStdDevName }; + populateMetricsList(context, metricNames); + } + + @Override + public void processGauge(MetricName name, Gauge<?> gauge, Context context) throws Exception { + final long currentTimeMillis = System.currentTimeMillis(); + final String sanitizedName = sanitizeName(name); + final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, sanitizedName, + Double.parseDouble(String.valueOf(gauge.value()))); + metricsCache.putTimelineMetric(metric); + populateMetricsList(context, sanitizedName); + } + + private void populateMetricsList(Context context, String... metricNames) { + for (String metricName : metricNames) { + TimelineMetric cachedMetric = metricsCache.getTimelineMetric(metricName); + if (cachedMetric != null) { + context.getTimelineMetricList().add(cachedMetric); + } + } + } + + protected String sanitizeName(MetricName name) { + if (name == null) { + return ""; + } + final String qualifiedTypeName = name.getGroup() + "." + name.getType() + "." + name.getName(); + final String metricName = name.hasScope() ? qualifiedTypeName + '.' + name.getScope() : qualifiedTypeName; + final StringBuilder sb = new StringBuilder(); + for (int i = 0; i < metricName.length(); i++) { + final char p = metricName.charAt(i); + if (!(p >= 'A' && p <= 'Z') && !(p >= 'a' && p <= 'z') && !(p >= '0' && p <= '9') && (p != '_') && (p != '-') + && (p != '.') && (p != '\0')) { + sb.append('_'); + } else { + sb.append(p); + } + } + return sb.toString(); + } + + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/8d27ac2b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterMBean.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterMBean.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterMBean.java new file mode 100644 index 0000000..7f6c5c9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterMBean.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.kafka; + +import kafka.metrics.KafkaMetricsReporterMBean; + +public interface KafkaTimelineMetricsReporterMBean extends KafkaMetricsReporterMBean { + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/8d27ac2b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporter.java new file mode 100644 index 0000000..f4f8333 --- /dev/null +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporter.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.kafka; + +import java.io.Closeable; +import java.util.Locale; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.yammer.metrics.core.Metric; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.MetricsRegistry; +import com.yammer.metrics.reporting.ConsoleReporter; +import com.yammer.metrics.reporting.CsvReporter; + +/** + * The abstract base class for all scheduled reporters (i.e., reporters which + * process a registry's metrics periodically). + * + * @see ConsoleReporter + * @see CsvReporter + * @see Slf4jReporter + */ +public abstract class ScheduledReporter implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(ScheduledReporter.class); + + /** + * A simple named thread factory. + */ + @SuppressWarnings("NullableProblems") + private static class NamedThreadFactory implements ThreadFactory { + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + private NamedThreadFactory(String name) { + final SecurityManager s = System.getSecurityManager(); + this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + this.namePrefix = "metrics-" + name + "-thread-"; + } + + @Override + public Thread newThread(Runnable r) { + final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); + t.setDaemon(true); + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } + } + + private static final AtomicInteger FACTORY_ID = new AtomicInteger(); + + private final MetricsRegistry registry; + private final ScheduledExecutorService executor; + private final double durationFactor; + private final String durationUnit; + private final double rateFactor; + private final String rateUnit; + + /** + * Creates a new {@link ScheduledReporter} instance. + * + * @param registry + * the {@link com.codahale.metrics.MetricRegistry} containing the + * metrics this reporter will report + * @param name + * the reporter's name + * @param rateUnit + * a unit of time + * @param durationUnit + * a unit of time + */ + protected ScheduledReporter(MetricsRegistry registry, String name, TimeUnit rateUnit, TimeUnit durationUnit) { + this(registry, rateUnit, durationUnit, Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(name + '-' + + FACTORY_ID.incrementAndGet()))); + } + + /** + * Creates a new {@link ScheduledReporter} instance. + * + * @param registry + * the {@link com.codahale.metrics.MetricRegistry} containing the + * metrics this reporter will report + * @param executor + * the executor to use while scheduling reporting of metrics. + */ + protected ScheduledReporter(MetricsRegistry registry, TimeUnit rateUnit, TimeUnit durationUnit, + ScheduledExecutorService executor) { + this.registry = registry; + this.executor = executor; + this.rateFactor = rateUnit.toSeconds(1); + this.rateUnit = calculateRateUnit(rateUnit); + this.durationFactor = 1.0 / durationUnit.toNanos(1); + this.durationUnit = durationUnit.toString().toLowerCase(Locale.US); + } + + /** + * Starts the reporter polling at the given period. + * + * @param period + * the amount of time between polls + * @param unit + * the unit for {@code period} + */ + public void start(long period, TimeUnit unit) { + executor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + report(); + } catch (RuntimeException ex) { + LOG.error("RuntimeException thrown from {}#report. Exception was suppressed.", ScheduledReporter.this + .getClass().getSimpleName(), ex); + } + } + }, period, period, unit); + } + + /** + * Stops the reporter and shuts down its thread of execution. + * + * Uses the shutdown pattern from + * http://docs.oracle.com/javase/7/docs/api/java + * /util/concurrent/ExecutorService.html + */ + public void stop() { + executor.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + System.err.println(getClass().getSimpleName() + ": ScheduledExecutorService did not terminate"); + } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + executor.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + /** + * Stops the reporter and shuts down its thread of execution. + */ + @Override + public void close() { + stop(); + } + + /** + * Report the current values of all metrics in the registry. + */ + public void report() { + synchronized (this) { + report(registry.allMetrics().entrySet()); + } + } + + /** + * Called periodically by the polling thread. Subclasses should report all the + * given metrics. + * + * @param metrics + * all the metrics in the registry + */ + public abstract void report(Set<Entry<MetricName, Metric>> metrics); + + protected String getRateUnit() { + return rateUnit; + } + + protected String getDurationUnit() { + return durationUnit; + } + + protected double convertDuration(double duration) { + return duration * durationFactor; + } + + protected double convertRate(double rate) { + return rate * rateFactor; + } + + private String calculateRateUnit(TimeUnit unit) { + final String s = unit.toString().toLowerCase(Locale.US); + return s.substring(0, s.length() - 1); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/8d27ac2b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java new file mode 100644 index 0000000..67c61e1 --- /dev/null +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.kafka; + +import static org.mockito.Mockito.mock; +import static org.powermock.api.easymock.PowerMock.mockStatic; +import static org.powermock.api.easymock.PowerMock.replay; +import static org.powermock.api.easymock.PowerMock.verifyAll; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import kafka.utils.VerifiableProperties; + +import org.apache.commons.httpclient.HttpClient; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.Counter; +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.core.Meter; +import com.yammer.metrics.core.Metric; +import com.yammer.metrics.core.MetricsRegistry; +import com.yammer.metrics.core.Timer; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ Metrics.class, HttpClient.class }) +@PowerMockIgnore("javax.management.*") +public class KafkaTimelineMetricsReporterTest { + + private final List<Metric> list = new ArrayList<Metric>(); + private final MetricsRegistry registry = new MetricsRegistry(); + @SuppressWarnings("rawtypes") + private final Gauge gauge = mock(Gauge.class); + private final KafkaTimelineMetricsReporter kafkaTimelineMetricsReporter = new KafkaTimelineMetricsReporter(); + private VerifiableProperties props; + + @Before + public void setUp() throws Exception { + @SuppressWarnings({ "rawtypes", "unchecked" }) + Gauge g = registry.newGauge(System.class, "gauge", gauge); + Counter counter = registry.newCounter(System.class, "counter"); + Histogram histogram = registry.newHistogram(System.class, "histogram"); + Meter meter = registry.newMeter(System.class, "meter", "empty", TimeUnit.MILLISECONDS); + Timer timer = registry.newTimer(System.class, "timer"); + list.add(g); + list.add(counter); + list.add(histogram); + list.add(meter); + list.add(timer); + Properties properties = new Properties(); + properties.setProperty("kafka.timeline.metrics.sendInterval", "5900"); + properties.setProperty("kafka.timeline.metrics.maxRowCacheSize", "10000"); + properties.setProperty("kafka.timeline.metrics.host", "localhost"); + properties.setProperty("kafka.timeline.metrics.port", "8188"); + properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true"); + props = new VerifiableProperties(properties); + } + + @Test + public void testReporterStartStop() { + mockStatic(Metrics.class); + EasyMock.expect(Metrics.defaultRegistry()).andReturn(registry).times(2); + TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(kafkaTimelineMetricsReporter); + kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache); + HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class); + kafkaTimelineMetricsReporter.setHttpClient(httpClient); + replay(Metrics.class, httpClient, timelineMetricsCache); + kafkaTimelineMetricsReporter.init(props); + kafkaTimelineMetricsReporter.stopReporter(); + verifyAll(); + } + + private TimelineMetricsCache getTimelineMetricsCache(KafkaTimelineMetricsReporter kafkaTimelineMetricsReporter) { + TimelineMetricsCache timelineMetricsCache = EasyMock.createNiceMock(TimelineMetricsCache.class); + kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache); + EasyMock.expect(timelineMetricsCache.getTimelineMetric("key1")).andReturn(new TimelineMetric()).once(); + timelineMetricsCache.putTimelineMetric(EasyMock.anyObject(TimelineMetric.class)); + EasyMock.expectLastCall().once(); + return timelineMetricsCache; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/8d27ac2b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java new file mode 100644 index 0000000..41f9126 --- /dev/null +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.kafka; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.yammer.metrics.core.Counter; +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.core.Meter; +import com.yammer.metrics.core.Metric; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.MetricsRegistry; +import com.yammer.metrics.core.Timer; + +public class ScheduledReporterTest { + private final Gauge gauge = mock(Gauge.class); + private final List<Metric> list = new ArrayList<Metric>(); + private final MetricsRegistry registry = new MetricsRegistry(); + private final ScheduledReporter reporter = spy(new ScheduledReporter(registry, "example", TimeUnit.SECONDS, + TimeUnit.MILLISECONDS) { + @Override + public void report(Set<Entry<MetricName, Metric>> metrics) { + // nothing doing! + } + }); + + @SuppressWarnings("unchecked") + @Before + public void setUp() throws Exception { + Gauge g = registry.newGauge(System.class, "gauge", gauge); + Counter counter = registry.newCounter(System.class, "counter"); + Histogram histogram = registry.newHistogram(System.class, "histogram"); + Meter meter = registry.newMeter(System.class, "meter", "empty", TimeUnit.MILLISECONDS); + Timer timer = registry.newTimer(System.class, "timer"); + list.add(g); + list.add(counter); + list.add(histogram); + list.add(meter); + list.add(timer); + reporter.start(200, TimeUnit.MILLISECONDS); + } + + @After + public void tearDown() throws Exception { + reporter.stop(); + } + + @Test + public void pollsPeriodically() throws Exception { + Thread.sleep(500); + verify(reporter, times(2)).report(set(list)); + } + + private Set<Entry<MetricName, Metric>> set(List<Metric> metrics) { + final Map<MetricName, Metric> map = new HashMap<MetricName, Metric>(); + for (Metric metric : metrics) { + String name = null; + if (metric instanceof Gauge) { + name = "gauge"; + } else if (metric instanceof Counter) { + name = "counter"; + } else if (metric instanceof Histogram) { + name = "histogram"; + } else if (metric instanceof Meter) { + name = "meter"; + } else if (metric instanceof Timer) { + name = "timer"; + } + map.put(new MetricName(System.class, name), metric); + } + return map.entrySet(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/8d27ac2b/ambari-metrics/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml index d99cc82..22067d9 100644 --- a/ambari-metrics/pom.xml +++ b/ambari-metrics/pom.xml @@ -14,11 +14,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 - http://maven.apache.org/xsd/maven-4.0.0.xsd"> +--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <groupId>org.apache.ambari</groupId> <modelVersion>4.0.0</modelVersion> @@ -29,6 +25,7 @@ <module>ambari-metrics-common</module> <module>ambari-metrics-hadoop-sink</module> <module>ambari-metrics-flume-sink</module> + <module>ambari-metrics-kafka-sink</module> <module>ambari-metrics-storm-sink</module> <module>ambari-metrics-timelineservice</module> <module>ambari-metrics-host-monitoring</module> http://git-wip-us.apache.org/repos/asf/ambari/blob/8d27ac2b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml index dc1b6b4..9c11007 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml @@ -101,7 +101,7 @@ <name>num.partitions</name> <value>1</value> <description> - The default number of partitions per topic. + The default number of partitions per topic. </description> </property> <property> @@ -291,9 +291,9 @@ </property> <property> <name>kafka.metrics.reporters</name> - <value>kafka.ganglia.KafkaGangliaMetricsReporter</value> + <value>{{kafka_metrics_reporters}}</value> <description> - kafka ganglia metrics reporter + kafka ganglia metrics reporter and kafka timeline metrics reporter </description> </property> <property> @@ -318,4 +318,29 @@ <value>kafka</value> <description>Ganglia group name </description> </property> + <property> + <name>kafka.timeline.metrics.reporter.enabled</name> + <value>true</value> + <description>Kafka timeline metrics reporter enable</description> + </property> + <property> + <name>kafka.timeline.metrics.host</name> + <value>{{metric_collector_host}}</value> + <description>Timeline host</description> + </property> + <property> + <name>kafka.timeline.metrics.port</name> + <value>{{metric_collector_port}}</value> + <description>Timeline port</description> + </property> + <property> + <name>kafka.timeline.metrics.reporter.sendInterval</name> + <value>5900</value> + <description>Timeline metrics reporter send interval</description> + </property> + <property> + <name>kafka.timeline.metrics.maxRowCacheSize</name> + <value>10000</value> + <description>Timeline metrics reporter send interval</description> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/ambari/blob/8d27ac2b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml index 7ad4396..f322406 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml @@ -50,6 +50,12 @@ # The java implementation to use. export JAVA_HOME={{java64_home}} export PATH=$PATH:$JAVA_HOME/bin + +# Add kafka sink to classpath and related depenencies +if [ -e "/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar" ]; then + export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar + export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/lib/* +fi </value> </property> </configuration> http://git-wip-us.apache.org/repos/asf/ambari/blob/8d27ac2b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py index c0231a8..33ee47a 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py @@ -35,6 +35,9 @@ def kafka(): kafka_server_config = mutable_config_dict(params.config['configurations']['kafka-broker']) kafka_server_config['broker.id'] = brokerid kafka_server_config['host.name'] = params.hostname + kafka_server_config['kafka.timeline.metrics.host'] = params.metric_collector_host + kafka_server_config['kafka.timeline.metrics.port'] = params.metric_collector_port + kafka_server_config['kafka.metrics.reporters'] = params.kafka_metrics_reporters kafka_data_dir = kafka_server_config['log.dirs'] Directory(filter(None,kafka_data_dir.split(",")), owner=params.kafka_user, http://git-wip-us.apache.org/repos/asf/ambari/blob/8d27ac2b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py index 067e537..800ccc4 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py @@ -60,3 +60,30 @@ if (('kafka-log4j' in config['configurations']) and ('content' in config['config log4j_props = config['configurations']['kafka-log4j']['content'] else: log4j_props = None + +if 'ganglia_server_host' in config['clusterHostInfo'] and \ + len(config['clusterHostInfo']['ganglia_server_host'])>0: + ganglia_installed = True + ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0] + ganglia_report_interval = 60 +else: + ganglia_installed = False + +kafka_metrics_reporters="" + +if ganglia_installed: + kafka_metrics_reporters = "kafka.ganglia.KafkaGangliaMetricsReporter" + +ams_collector_hosts = default("/clusterHostInfo/metric_collector_hosts", []) +has_metric_collector = not len(ams_collector_hosts) == 0 + +if has_metric_collector: + metric_collector_host = ams_collector_hosts[0] + metric_collector_port = default("/configurations/ams-site/timeline.metrics.service.webapp.address", "0.0.0.0:8188") + if metric_collector_port and metric_collector_port.find(':') != -1: + metric_collector_port = metric_collector_port.split(':')[1] + + if not len(kafka_metrics_reporters) == 0: + kafka_metrics_reporters = kafka_metrics_reporters + ',' + + kafka_metrics_reporters = kafka_metrics_reporters + "org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter"
