Repository: ambari Updated Branches: refs/heads/trunk d7ca3036e -> f1d354cca
Revert "AMBARI-9185. Add Kafka metric sink implementation to enable sink to AMS. Build failure." This reverts commit 8d27ac2b77abdeeb61fd8d9bc6b51ce113fcd900. Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f1d354cc Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f1d354cc Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f1d354cc Branch: refs/heads/trunk Commit: f1d354ccae8f95a63735628c133183a87249d1b2 Parents: d7ca303 Author: Siddharth Wagle <[email protected]> Authored: Fri Jan 16 15:25:57 2015 -0800 Committer: Siddharth Wagle <[email protected]> Committed: Fri Jan 16 15:25:57 2015 -0800 ---------------------------------------------------------------------- ambari-metrics/ambari-metrics-assembly/pom.xml | 47 +- .../src/main/assembly/sink-windows.xml | 8 - .../src/main/assembly/sink.xml | 8 - .../src/main/package/rpm/sink/postinstall.sh | 11 +- .../ambari-metrics-kafka-sink/pom.xml | 163 ------- .../src/main/assemblies/empty.xml | 21 - .../src/main/assemblies/jar-with-common.xml | 34 -- .../kafka/KafkaTimelineMetricsReporter.java | 448 ------------------- .../KafkaTimelineMetricsReporterMBean.java | 25 -- .../metrics2/sink/kafka/ScheduledReporter.java | 218 --------- .../kafka/KafkaTimelineMetricsReporterTest.java | 109 ----- .../sink/kafka/ScheduledReporterTest.java | 105 ----- ambari-metrics/pom.xml | 7 +- .../0.8.1.2.2/configuration/kafka-broker.xml | 31 +- .../KAFKA/0.8.1.2.2/configuration/kafka-env.xml | 6 - .../KAFKA/0.8.1.2.2/package/scripts/kafka.py | 3 - .../KAFKA/0.8.1.2.2/package/scripts/params.py | 27 -- 17 files changed, 12 insertions(+), 1259 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-assembly/pom.xml b/ambari-metrics/ambari-metrics-assembly/pom.xml index 60ba30b..1e1ca8c 100644 --- a/ambari-metrics/ambari-metrics-assembly/pom.xml +++ b/ambari-metrics/ambari-metrics-assembly/pom.xml @@ -36,7 +36,6 @@ <hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir> <storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir> <flume-sink.dir>${project.basedir}/../ambari-metrics-flume-sink</flume-sink.dir> - <kafka-sink.dir>${project.basedir}/../ambari-metrics-kafka-sink</kafka-sink.dir> <python.ver>python >= 2.6</python.ver> <deb.python.ver>python (>= 2.6)</deb.python.ver> <deb.architecture>amd64</deb.architecture> @@ -44,7 +43,6 @@ <hadoop.sink.jar>ambari-metrics-hadoop-sink-with-common-${project.version}.jar</hadoop.sink.jar> <storm.sink.jar>ambari-metrics-storm-sink-with-common-${project.version}.jar</storm.sink.jar> <flume.sink.jar>ambari-metrics-flume-sink-with-common-${project.version}.jar</flume.sink.jar> - <kafka.sink.jar>ambari-metrics-kafka-sink-with-common-${project.version}.jar</kafka.sink.jar> </properties> <build> @@ -449,6 +447,7 @@ <location>${hadoop-sink.dir}/target/ambari-metrics-hadoop-sink-with-common-${project.version}.jar</location> </source> </sources> + </mapping> <mapping> <directory>/usr/lib/flume/lib</directory> @@ -466,22 +465,6 @@ </source> </sources> </mapping> - <mapping> - <directory>/usr/lib/ambari-metrics-kafka-sink</directory> - <sources> - <source> - <location>${kafka-sink.dir}/target/${kafka.sink.jar}</location> - </source> - </sources> - </mapping> - <mapping> - <directory>/usr/lib/ambari-metrics-kafka-sink/lib</directory> - <sources> - <source> - <location>${kafka-sink.dir}/target/lib</location> - </source> - </sources> - </mapping> </mappings> </configuration> @@ -612,7 +595,6 @@ <path>/var/log/ambari-metrics-collector</path> <path>/var/lib/ambari-metrics-collector</path> <path>/usr/lib/ambari-metrics-hadoop-sink</path> - <path>/usr/lib/ambari-metrics-kafka-sink</path> <path>/usr/lib/flume/lib</path> <path>/usr/lib/storm/lib</path> </paths> @@ -789,28 +771,6 @@ </mapper> </data> - <!-- kafka sink --> - - <data> - <src>${kafka-sink.dir}/target/${kafka.sink.jar}</src> - <type>file</type> - <mapper> - <type>perm</type> - <filemode>644</filemode> - <dirmode>755</dirmode> - <prefix>/usr/lib/ambari-metrics-kafka-sink</prefix> - </mapper> - </data> - <data> - <src>${kafka-sink.dir}/target/lib</src> - <type>file</type> - <mapper> - <type>perm</type> - <filemode>644</filemode> - <dirmode>755</dirmode> - <prefix>/usr/lib/ambari-metrics-kafka-sink/lib</prefix> - </mapper> - </data> </dataSet> </configuration> </plugin> @@ -1103,11 +1063,6 @@ </dependency> <dependency> <groupId>org.apache.ambari</groupId> - <artifactId>ambari-metrics-kafka-sink</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.ambari</groupId> <artifactId>ambari-metrics-host-monitoring</artifactId> <version>${project.version}</version> <type>pom</type> http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml index 38a8093..0a36fac 100644 --- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml +++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml @@ -37,10 +37,6 @@ <directory>${storm-sink.dir}/src/main/conf</directory> <outputDirectory>hadoop-sink/conf</outputDirectory> </fileSet> - <fileSet> - <directory>${kafka-sink.dir}/target/lib</directory> - <outputDirectory>hadoop-sink/lib</outputDirectory> - </fileSet> </fileSets> <files> @@ -57,10 +53,6 @@ <outputDirectory>hadoop-sink</outputDirectory> </file> <file> - <source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source> - <outputDirectory>hadoop-sink</outputDirectory> - </file> - <file> <source>${basedir}/src/main/package/msi/sink.wxs</source> <outputDirectory>../../</outputDirectory> <filtered>true</filtered> http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml index 4a3b7c5..2426904 100644 --- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml +++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml @@ -38,10 +38,6 @@ <directory>${storm-sink.dir}/src/main/conf</directory> <outputDirectory>hadoop-sink/conf</outputDirectory> </fileSet> - <fileSet> - <directory>${kafka-sink.dir}/target/lib</directory> - <outputDirectory>hadoop-sink/lib</outputDirectory> - </fileSet> </fileSets> <files> @@ -57,10 +53,6 @@ <source>${storm-sink.dir}/target/ambari-metrics-storm-sink-with-common-${project.version}.jar</source> <outputDirectory>hadoop-sink</outputDirectory> </file> - <file> - <source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source> - <outputDirectory>hadoop-sink</outputDirectory> - </file> </files> http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh b/ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh index e87b9f0..1955680 100644 --- a/ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh +++ b/ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh @@ -15,20 +15,15 @@ # limitations under the License HADOOP_LINK_NAME="/usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar" -HADOOP_SINK_JAR="/usr/lib/ambari-metrics-hadoop-sink/${hadoop.sink.jar}" - FLUME_LINK_NAME="/usr/lib/flume/lib/ambari-metrics-flume-sink.jar" +HADOOP_SINK_JAR="/usr/lib/ambari-metrics-hadoop-sink/${hadoop.sink.jar}" FLUME_SINK_JAR="/usr/lib/flume/lib/${flume.sink.jar}" - -KAFKA_LINK_NAME="/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar" -KAFKA_SINK_JAR="/usr/lib/ambari-metrics-kafka-sink/${kafka.sink.jar}" - #link for storm jar not required with current loading #STORM_SINK_JAR="/usr/lib/storm/lib/${storm.sink.jar}" #STORM_LINK_NAME="/usr/lib/storm/lib/ambari-metrics-storm-sink.jar" -JARS=(${HADOOP_SINK_JAR} ${FLUME_SINK_JAR} ${KAFKA_SINK_JAR}) -LINKS=(${HADOOP_LINK_NAME} ${FLUME_LINK_NAME} ${KAFKA_LINK_NAME}) +JARS=(${HADOOP_SINK_JAR} ${FLUME_SINK_JAR}) +LINKS=(${HADOOP_LINK_NAME} ${FLUME_LINK_NAME}) for index in ${!LINKS[*]} do http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-kafka-sink/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/pom.xml b/ambari-metrics/ambari-metrics-kafka-sink/pom.xml deleted file mode 100644 index 55c6f07..0000000 --- a/ambari-metrics/ambari-metrics-kafka-sink/pom.xml +++ /dev/null @@ -1,163 +0,0 @@ -<?<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 - http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>ambari-metrics</artifactId> - <groupId>org.apache.ambari</groupId> - <version>0.1.0-SNAPSHOT</version> - </parent> - <modelVersion>4.0.0</modelVersion> - <artifactId>ambari-metrics-kafka-sink</artifactId> - <version>0.1.0-SNAPSHOT</version> - <packaging>jar</packaging> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.9</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <includeArtifactIds>commons-codec,commons-collections,commons-httpclient,commons-lang,commons-logging,guava,jackson-core-asl,jackson-mapper-asl,jackson-xc,hadoop-common</includeArtifactIds> - <outputDirectory>${project.build.directory}/lib</outputDirectory> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <configuration> - <descriptors> - <descriptor>src/main/assemblies/jar-with-common.xml</descriptor> - </descriptors> - <attach>false</attach> - <tarLongFileMode>gnu</tarLongFileMode> - <appendAssemblyId>false</appendAssemblyId> - <finalName>${project.artifactId}-with-common-${project.version}</finalName> - </configuration> - <id>build-jar</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.0</version> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.8</version> - <executions> - <execution> - <id>parse-version</id> - <phase>validate</phase> - <goals> - <goal>parse-version</goal> - </goals> - </execution> - <execution> - <id>regex-property</id> - <goals> - <goal>regex-property</goal> - </goals> - <configuration> - <name>ambariVersion</name> - <value>${project.version}</value> - <regex>^([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex> - <replacement>$1.$2.$3</replacement> - <failIfNoMatch>false</failIfNoMatch> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>com.github.goldin</groupId> - <artifactId>copy-maven-plugin</artifactId> - <version>0.2.5</version> - <executions> - <execution> - <id>create-archive</id> - <phase>none</phase> - </execution> - </executions> - </plugin> - </plugins> - </build> - <dependencies> - <dependency> - <groupId>org.apache.ambari</groupId> - <artifactId>ambari-metrics-common</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> - <version>0.8.1.1</version> - </dependency> - <dependency> - <groupId>com.yammer.metrics</groupId> - <artifactId>metrics-core</artifactId> - <version>2.2.0</version> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - <version>4.10</version> - </dependency> - <dependency> - <groupId>org.easymock</groupId> - <artifactId>easymock</artifactId> - <version>3.2</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-api-easymock</artifactId> - <version>1.4.9</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-module-junit4</artifactId> - <version>1.4.9</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> - <version>1.9.5</version> - <scope>test</scope> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/empty.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/empty.xml b/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/empty.xml deleted file mode 100644 index 17ff68a..0000000 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/empty.xml +++ /dev/null @@ -1,21 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<assembly> - <id>empty</id> - <formats/> -</assembly> http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/jar-with-common.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/jar-with-common.xml b/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/jar-with-common.xml deleted file mode 100644 index 4fa8585..0000000 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/jar-with-common.xml +++ /dev/null @@ -1,34 +0,0 @@ -<?xml version="1.0"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<assembly> - <id>jar-with-common</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <dependencySets> - <dependencySet> - <outputDirectory>/</outputDirectory> - <unpack>true</unpack> - <includes> - <include>org.apache.ambari:ambari-metrics-common</include> - <include>org.apache.ambari:ambari-metrics-kafka-sink</include> - </includes> - </dependencySet> - </dependencySets> -</assembly> http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java deleted file mode 100644 index 762b5f2..0000000 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java +++ /dev/null @@ -1,448 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metrics2.sink.kafka; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import kafka.metrics.KafkaMetricsConfig; -import kafka.metrics.KafkaMetricsReporter; -import kafka.utils.VerifiableProperties; - -import org.apache.commons.lang.ClassUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; -import org.apache.hadoop.metrics2.util.Servers; - -import com.yammer.metrics.Metrics; -import com.yammer.metrics.core.Counter; -import com.yammer.metrics.core.Gauge; -import com.yammer.metrics.core.Histogram; -import com.yammer.metrics.core.Metered; -import com.yammer.metrics.core.Metric; -import com.yammer.metrics.core.MetricName; -import com.yammer.metrics.core.MetricProcessor; -import com.yammer.metrics.core.MetricsRegistry; -import com.yammer.metrics.core.Timer; -import com.yammer.metrics.stats.Snapshot; - -public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink implements KafkaMetricsReporter, - KafkaTimelineMetricsReporterMBean { - - private final static Log LOG = LogFactory.getLog(KafkaTimelineMetricsReporter.class); - - private static final String TIMELINE_METRICS_SEND_INTERVAL_PROPERTY = "kafka.timeline.metrics.sendInterval"; - private static final String TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY = "kafka.timeline.metrics.maxRowCacheSize"; - private static final String TIMELINE_HOST_PROPERTY = "kafka.timeline.metrics.host"; - private static final String TIMELINE_PORT_PROPERTY = "kafka.timeline.metrics.port"; - private static final String TIMELINE_REPORTER_ENABLED_PROPERTY = "kafka.timeline.metrics.reporter.enabled"; - private static final String TIMELINE_DEFAULT_HOST = "localhost"; - private static final String TIMELINE_DEFAULT_PORT = "8188"; - - private boolean initialized = false; - private boolean running = false; - private Object lock = new Object(); - private String collectorUri; - private String hostname; - private SocketAddress socketAddress; - private TimelineScheduledReporter reporter; - private TimelineMetricsCache metricsCache; - - @Override - protected SocketAddress getServerSocketAddress() { - return socketAddress; - } - - @Override - protected String getCollectorUri() { - return collectorUri; - } - - public void setMetricsCache(TimelineMetricsCache metricsCache) { - this.metricsCache = metricsCache; - } - - public void init(VerifiableProperties props) { - synchronized (lock) { - if (!initialized) { - LOG.info("Initializing Kafka Timeline Metrics Sink"); - try { - hostname = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - LOG.error("Could not identify hostname."); - throw new RuntimeException("Could not identify hostname.", e); - } - KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props); - int metricsSendInterval = Integer.parseInt(props.getString(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY, - String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS))); - int maxRowCacheSize = Integer.parseInt(props.getString(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY, - String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT))); - String metricCollectorHost = props.getString(TIMELINE_HOST_PROPERTY, TIMELINE_DEFAULT_HOST); - String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT); - setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval)); - collectorUri = "http://" + metricCollectorHost + ":" + metricCollectorPort + "/ws/v1/timeline/metrics"; - List<InetSocketAddress> socketAddresses = Servers.parse(metricCollectorHost, - Integer.parseInt(metricCollectorPort)); - if (socketAddresses != null && !socketAddresses.isEmpty()) { - socketAddress = socketAddresses.get(0); - } - initializeReporter(); - if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) { - startReporter(metricsConfig.pollingIntervalSecs()); - } - if (LOG.isTraceEnabled()) { - LOG.trace("CollectorUri = " + collectorUri); - LOG.trace("SocketAddress = " + socketAddress); - LOG.trace("MetricsSendInterval = " + metricsSendInterval); - LOG.trace("MaxRowCacheSize = " + maxRowCacheSize); - } - } - } - } - - public String getMBeanName() { - return "kafka:type=org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter"; - } - - public synchronized void startReporter(long period) { - synchronized (lock) { - if (initialized && !running) { - reporter.start(period, TimeUnit.SECONDS); - running = true; - LOG.info(String.format("Started Kafka Timeline metrics reporter with polling period %d seconds", period)); - } - } - } - - public synchronized void stopReporter() { - synchronized (lock) { - if (initialized && running) { - reporter.stop(); - running = false; - LOG.info("Stopped Kafka Timeline metrics reporter"); - initializeReporter(); - } - } - } - - private void initializeReporter() { - reporter = new TimelineScheduledReporter(Metrics.defaultRegistry(), "timeline-scheduled-reporter", - TimeUnit.SECONDS, TimeUnit.MILLISECONDS); - initialized = true; - } - - interface Context { - public List<TimelineMetric> getTimelineMetricList(); - } - - class TimelineScheduledReporter extends ScheduledReporter implements MetricProcessor<Context> { - - private static final String APP_ID = "kafka_broker"; - private static final String COUNT_SUFIX = ".count"; - private static final String ONE_MINUTE_RATE_SUFIX = ".1MinuteRate"; - private static final String MEAN_SUFIX = ".mean"; - private static final String MEAN_RATE_SUFIX = ".meanRate"; - private static final String FIVE_MINUTE_RATE_SUFIX = ".5MinuteRate"; - private static final String FIFTEEN_MINUTE_RATE_SUFIX = ".15MinuteRate"; - private static final String MIN_SUFIX = ".min"; - private static final String MAX_SUFIX = ".max"; - private static final String MEDIAN_SUFIX = ".median"; - private static final String STD_DEV_SUFIX = "stddev"; - private static final String SEVENTY_FIFTH_PERCENTILE_SUFIX = ".75percentile"; - private static final String NINETY_FIFTH_PERCENTILE_SUFIX = ".95percentile"; - private static final String NINETY_EIGHTH_PERCENTILE_SUFIX = ".98percentile"; - private static final String NINETY_NINTH_PERCENTILE_SUFIX = ".99percentile"; - private static final String NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX = ".999percentile"; - - protected TimelineScheduledReporter(MetricsRegistry registry, String name, TimeUnit rateUnit, TimeUnit durationUnit) { - super(registry, name, rateUnit, durationUnit); - } - - @Override - public void report(Set<Entry<MetricName, Metric>> metrics) { - final List<TimelineMetric> metricsList = new ArrayList<TimelineMetric>(); - try { - for (Entry<MetricName, Metric> entry : metrics) { - final MetricName metricName = entry.getKey(); - final Metric metric = entry.getValue(); - Context context = new Context() { - - public List<TimelineMetric> getTimelineMetricList() { - return metricsList; - } - - }; - metric.processWith(this, metricName, context); - } - } catch (Throwable t) { - LOG.error("Exception processing Kafka metric", t); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Metrics List size: " + metricsList.size()); - LOG.trace("Metics Set size: " + metrics.size()); - } - if (!metricsList.isEmpty()) { - TimelineMetrics timelineMetrics = new TimelineMetrics(); - timelineMetrics.setMetrics(metricsList); - try { - emitMetrics(timelineMetrics); - } catch (IOException e) { - LOG.error("Unexpected error", e); - } catch (Throwable t) { - LOG.error("Exception emitting metrics", t); - } - } - } - - private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, - Number attributeValue) { - if (LOG.isTraceEnabled()) { - LOG.trace("Creating timeline metric: " + attributeName + " = " + attributeValue + " time = " - + currentTimeMillis + " app_id = " + component); - } - TimelineMetric timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(attributeName); - timelineMetric.setHostName(hostname); - timelineMetric.setAppId(component); - timelineMetric.setStartTime(currentTimeMillis); - timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number")); - timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue.doubleValue()); - return timelineMetric; - } - - @Override - public void processMeter(MetricName name, Metered meter, Context context) throws Exception { - final long currentTimeMillis = System.currentTimeMillis(); - final String sanitizedName = sanitizeName(name); - final String meterCountName = sanitizedName + COUNT_SUFIX; - final TimelineMetric countMetric = createTimelineMetric(currentTimeMillis, APP_ID, meterCountName, meter.count()); - - final String meterOneMinuteRateName = sanitizedName + ONE_MINUTE_RATE_SUFIX; - final TimelineMetric oneMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID, - meterOneMinuteRateName, meter.oneMinuteRate()); - - final String meterMeanRateName = sanitizedName + MEAN_RATE_SUFIX; - final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, meterMeanRateName, - meter.meanRate()); - - final String meterFiveMinuteRateName = sanitizedName + FIVE_MINUTE_RATE_SUFIX; - final TimelineMetric fiveMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID, - meterFiveMinuteRateName, meter.fiveMinuteRate()); - - final String meterFifteenMinuteRateName = sanitizedName + FIFTEEN_MINUTE_RATE_SUFIX; - final TimelineMetric fifteenMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID, - meterFifteenMinuteRateName, meter.fifteenMinuteRate()); - - metricsCache.putTimelineMetric(countMetric); - metricsCache.putTimelineMetric(oneMinuteRateMetric); - metricsCache.putTimelineMetric(meanMetric); - metricsCache.putTimelineMetric(fiveMinuteRateMetric); - metricsCache.putTimelineMetric(fifteenMinuteRateMetric); - - String[] metricNames = new String[] { meterCountName, meterOneMinuteRateName, meterMeanRateName, - meterFiveMinuteRateName, meterFifteenMinuteRateName }; - populateMetricsList(context, metricNames); - } - - @Override - public void processCounter(MetricName name, Counter counter, Context context) throws Exception { - final long currentTimeMillis = System.currentTimeMillis(); - final String sanitizedName = sanitizeName(name); - final String metricCountName = sanitizedName + COUNT_SUFIX; - final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, metricCountName, counter.count()); - metricsCache.putTimelineMetric(metric); - populateMetricsList(context, metricCountName); - } - - @Override - public void processHistogram(MetricName name, Histogram histogram, Context context) throws Exception { - final long currentTimeMillis = System.currentTimeMillis(); - final Snapshot snapshot = histogram.getSnapshot(); - final String sanitizedName = sanitizeName(name); - - final String histogramMinName = sanitizedName + MIN_SUFIX; - final TimelineMetric minMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMinName, - histogram.min()); - - final String histogramMaxName = sanitizedName + MAX_SUFIX; - final TimelineMetric maxMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMaxName, - histogram.max()); - - final String histogramMeanName = sanitizedName + MEAN_SUFIX; - final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMeanName, - histogram.mean()); - - final String histogramMedianName = sanitizedName + MEDIAN_SUFIX; - final TimelineMetric medianMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMedianName, - snapshot.getMedian()); - - final String histogramStdDevName = sanitizedName + STD_DEV_SUFIX; - final TimelineMetric stdDevMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramStdDevName, - histogram.stdDev()); - - final String histogramSeventyFifthPercentileName = sanitizedName + SEVENTY_FIFTH_PERCENTILE_SUFIX; - final TimelineMetric seventyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - histogramSeventyFifthPercentileName, snapshot.get75thPercentile()); - - final String histogramNinetyFifthPercentileName = sanitizedName + NINETY_FIFTH_PERCENTILE_SUFIX; - final TimelineMetric nintyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - histogramNinetyFifthPercentileName, snapshot.get95thPercentile()); - - final String histogramNinetyEighthPercentileName = sanitizedName + NINETY_EIGHTH_PERCENTILE_SUFIX; - final TimelineMetric nintyEighthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - histogramNinetyEighthPercentileName, snapshot.get98thPercentile()); - - final String histogramNinetyNinethPercentileName = sanitizedName + NINETY_NINTH_PERCENTILE_SUFIX; - final TimelineMetric nintyNinthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - histogramNinetyNinethPercentileName, snapshot.get99thPercentile()); - - final String histogramNinetyNinePointNinePercentileName = sanitizedName + NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX; - final TimelineMetric nintyNinthPointNinePercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - histogramNinetyNinePointNinePercentileName, snapshot.get999thPercentile()); - - metricsCache.putTimelineMetric(minMetric); - metricsCache.putTimelineMetric(maxMetric); - metricsCache.putTimelineMetric(meanMetric); - metricsCache.putTimelineMetric(medianMetric); - metricsCache.putTimelineMetric(stdDevMetric); - metricsCache.putTimelineMetric(seventyFifthPercentileMetric); - metricsCache.putTimelineMetric(nintyFifthPercentileMetric); - metricsCache.putTimelineMetric(nintyEighthPercentileMetric); - metricsCache.putTimelineMetric(nintyNinthPercentileMetric); - metricsCache.putTimelineMetric(nintyNinthPointNinePercentileMetric); - - String[] metricNames = new String[] { histogramMaxName, histogramMeanName, histogramMedianName, histogramMinName, - histogramNinetyEighthPercentileName, histogramNinetyFifthPercentileName, - histogramNinetyNinePointNinePercentileName, histogramNinetyNinethPercentileName, - histogramSeventyFifthPercentileName, histogramStdDevName }; - populateMetricsList(context, metricNames); - } - - @Override - public void processTimer(MetricName name, Timer timer, Context context) throws Exception { - final long currentTimeMillis = System.currentTimeMillis(); - final Snapshot snapshot = timer.getSnapshot(); - final String sanitizedName = sanitizeName(name); - - final String timerMinName = sanitizedName + MIN_SUFIX; - final TimelineMetric minMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMinName, timer.min()); - - final String timerMaxName = sanitizedName + MAX_SUFIX; - final TimelineMetric maxMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMaxName, timer.max()); - - final String timerMeanName = sanitizedName + MEAN_SUFIX; - final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMeanName, timer.mean()); - - final String timerMedianName = sanitizedName + MEDIAN_SUFIX; - final TimelineMetric medianMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMedianName, - snapshot.getMedian()); - - final String timerStdDevName = sanitizedName + STD_DEV_SUFIX; - final TimelineMetric stdDevMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerStdDevName, - timer.stdDev()); - - final String timerSeventyFifthPercentileName = sanitizedName + SEVENTY_FIFTH_PERCENTILE_SUFIX; - final TimelineMetric seventyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - timerSeventyFifthPercentileName, snapshot.get75thPercentile()); - - final String timerNinetyFifthPercentileName = sanitizedName + NINETY_FIFTH_PERCENTILE_SUFIX; - final TimelineMetric nintyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - timerNinetyFifthPercentileName, snapshot.get95thPercentile()); - - final String timerNinetyEighthPercentileName = sanitizedName + NINETY_EIGHTH_PERCENTILE_SUFIX; - final TimelineMetric nintyEighthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - timerNinetyEighthPercentileName, snapshot.get98thPercentile()); - - final String timerNinetyNinthPercentileName = sanitizedName + NINETY_NINTH_PERCENTILE_SUFIX; - final TimelineMetric nintyNinthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - timerNinetyNinthPercentileName, snapshot.get99thPercentile()); - - final String timerNinetyNinePointNinePercentileName = sanitizedName + NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX; - final TimelineMetric nintyNinthPointNinePercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - timerNinetyNinePointNinePercentileName, snapshot.get999thPercentile()); - - metricsCache.putTimelineMetric(minMetric); - metricsCache.putTimelineMetric(maxMetric); - metricsCache.putTimelineMetric(meanMetric); - metricsCache.putTimelineMetric(medianMetric); - metricsCache.putTimelineMetric(stdDevMetric); - metricsCache.putTimelineMetric(seventyFifthPercentileMetric); - metricsCache.putTimelineMetric(nintyFifthPercentileMetric); - metricsCache.putTimelineMetric(nintyEighthPercentileMetric); - metricsCache.putTimelineMetric(nintyNinthPercentileMetric); - metricsCache.putTimelineMetric(nintyNinthPointNinePercentileMetric); - - String[] metricNames = new String[] { timerMaxName, timerMeanName, timerMedianName, timerMinName, - timerNinetyEighthPercentileName, timerNinetyFifthPercentileName, timerNinetyNinePointNinePercentileName, - timerNinetyNinthPercentileName, timerSeventyFifthPercentileName, timerStdDevName }; - populateMetricsList(context, metricNames); - } - - @Override - public void processGauge(MetricName name, Gauge<?> gauge, Context context) throws Exception { - final long currentTimeMillis = System.currentTimeMillis(); - final String sanitizedName = sanitizeName(name); - final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, sanitizedName, - Double.parseDouble(String.valueOf(gauge.value()))); - metricsCache.putTimelineMetric(metric); - populateMetricsList(context, sanitizedName); - } - - private void populateMetricsList(Context context, String... metricNames) { - for (String metricName : metricNames) { - TimelineMetric cachedMetric = metricsCache.getTimelineMetric(metricName); - if (cachedMetric != null) { - context.getTimelineMetricList().add(cachedMetric); - } - } - } - - protected String sanitizeName(MetricName name) { - if (name == null) { - return ""; - } - final String qualifiedTypeName = name.getGroup() + "." + name.getType() + "." + name.getName(); - final String metricName = name.hasScope() ? qualifiedTypeName + '.' + name.getScope() : qualifiedTypeName; - final StringBuilder sb = new StringBuilder(); - for (int i = 0; i < metricName.length(); i++) { - final char p = metricName.charAt(i); - if (!(p >= 'A' && p <= 'Z') && !(p >= 'a' && p <= 'z') && !(p >= '0' && p <= '9') && (p != '_') && (p != '-') - && (p != '.') && (p != '\0')) { - sb.append('_'); - } else { - sb.append(p); - } - } - return sb.toString(); - } - - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterMBean.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterMBean.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterMBean.java deleted file mode 100644 index 7f6c5c9..0000000 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterMBean.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metrics2.sink.kafka; - -import kafka.metrics.KafkaMetricsReporterMBean; - -public interface KafkaTimelineMetricsReporterMBean extends KafkaMetricsReporterMBean { - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporter.java deleted file mode 100644 index f4f8333..0000000 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporter.java +++ /dev/null @@ -1,218 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metrics2.sink.kafka; - -import java.io.Closeable; -import java.util.Locale; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.yammer.metrics.core.Metric; -import com.yammer.metrics.core.MetricName; -import com.yammer.metrics.core.MetricsRegistry; -import com.yammer.metrics.reporting.ConsoleReporter; -import com.yammer.metrics.reporting.CsvReporter; - -/** - * The abstract base class for all scheduled reporters (i.e., reporters which - * process a registry's metrics periodically). - * - * @see ConsoleReporter - * @see CsvReporter - * @see Slf4jReporter - */ -public abstract class ScheduledReporter implements Closeable { - - private static final Logger LOG = LoggerFactory.getLogger(ScheduledReporter.class); - - /** - * A simple named thread factory. - */ - @SuppressWarnings("NullableProblems") - private static class NamedThreadFactory implements ThreadFactory { - private final ThreadGroup group; - private final AtomicInteger threadNumber = new AtomicInteger(1); - private final String namePrefix; - - private NamedThreadFactory(String name) { - final SecurityManager s = System.getSecurityManager(); - this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); - this.namePrefix = "metrics-" + name + "-thread-"; - } - - @Override - public Thread newThread(Runnable r) { - final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); - t.setDaemon(true); - if (t.getPriority() != Thread.NORM_PRIORITY) { - t.setPriority(Thread.NORM_PRIORITY); - } - return t; - } - } - - private static final AtomicInteger FACTORY_ID = new AtomicInteger(); - - private final MetricsRegistry registry; - private final ScheduledExecutorService executor; - private final double durationFactor; - private final String durationUnit; - private final double rateFactor; - private final String rateUnit; - - /** - * Creates a new {@link ScheduledReporter} instance. - * - * @param registry - * the {@link com.codahale.metrics.MetricRegistry} containing the - * metrics this reporter will report - * @param name - * the reporter's name - * @param rateUnit - * a unit of time - * @param durationUnit - * a unit of time - */ - protected ScheduledReporter(MetricsRegistry registry, String name, TimeUnit rateUnit, TimeUnit durationUnit) { - this(registry, rateUnit, durationUnit, Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(name + '-' - + FACTORY_ID.incrementAndGet()))); - } - - /** - * Creates a new {@link ScheduledReporter} instance. - * - * @param registry - * the {@link com.codahale.metrics.MetricRegistry} containing the - * metrics this reporter will report - * @param executor - * the executor to use while scheduling reporting of metrics. - */ - protected ScheduledReporter(MetricsRegistry registry, TimeUnit rateUnit, TimeUnit durationUnit, - ScheduledExecutorService executor) { - this.registry = registry; - this.executor = executor; - this.rateFactor = rateUnit.toSeconds(1); - this.rateUnit = calculateRateUnit(rateUnit); - this.durationFactor = 1.0 / durationUnit.toNanos(1); - this.durationUnit = durationUnit.toString().toLowerCase(Locale.US); - } - - /** - * Starts the reporter polling at the given period. - * - * @param period - * the amount of time between polls - * @param unit - * the unit for {@code period} - */ - public void start(long period, TimeUnit unit) { - executor.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - report(); - } catch (RuntimeException ex) { - LOG.error("RuntimeException thrown from {}#report. Exception was suppressed.", ScheduledReporter.this - .getClass().getSimpleName(), ex); - } - } - }, period, period, unit); - } - - /** - * Stops the reporter and shuts down its thread of execution. - * - * Uses the shutdown pattern from - * http://docs.oracle.com/javase/7/docs/api/java - * /util/concurrent/ExecutorService.html - */ - public void stop() { - executor.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { - executor.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { - System.err.println(getClass().getSimpleName() + ": ScheduledExecutorService did not terminate"); - } - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - executor.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } - - /** - * Stops the reporter and shuts down its thread of execution. - */ - @Override - public void close() { - stop(); - } - - /** - * Report the current values of all metrics in the registry. - */ - public void report() { - synchronized (this) { - report(registry.allMetrics().entrySet()); - } - } - - /** - * Called periodically by the polling thread. Subclasses should report all the - * given metrics. - * - * @param metrics - * all the metrics in the registry - */ - public abstract void report(Set<Entry<MetricName, Metric>> metrics); - - protected String getRateUnit() { - return rateUnit; - } - - protected String getDurationUnit() { - return durationUnit; - } - - protected double convertDuration(double duration) { - return duration * durationFactor; - } - - protected double convertRate(double rate) { - return rate * rateFactor; - } - - private String calculateRateUnit(TimeUnit unit) { - final String s = unit.toString().toLowerCase(Locale.US); - return s.substring(0, s.length() - 1); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java deleted file mode 100644 index 67c61e1..0000000 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metrics2.sink.kafka; - -import static org.mockito.Mockito.mock; -import static org.powermock.api.easymock.PowerMock.mockStatic; -import static org.powermock.api.easymock.PowerMock.replay; -import static org.powermock.api.easymock.PowerMock.verifyAll; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import kafka.utils.VerifiableProperties; - -import org.apache.commons.httpclient.HttpClient; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; -import org.easymock.EasyMock; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import com.yammer.metrics.Metrics; -import com.yammer.metrics.core.Counter; -import com.yammer.metrics.core.Gauge; -import com.yammer.metrics.core.Histogram; -import com.yammer.metrics.core.Meter; -import com.yammer.metrics.core.Metric; -import com.yammer.metrics.core.MetricsRegistry; -import com.yammer.metrics.core.Timer; - -@RunWith(PowerMockRunner.class) -@PrepareForTest({ Metrics.class, HttpClient.class }) -@PowerMockIgnore("javax.management.*") -public class KafkaTimelineMetricsReporterTest { - - private final List<Metric> list = new ArrayList<Metric>(); - private final MetricsRegistry registry = new MetricsRegistry(); - @SuppressWarnings("rawtypes") - private final Gauge gauge = mock(Gauge.class); - private final KafkaTimelineMetricsReporter kafkaTimelineMetricsReporter = new KafkaTimelineMetricsReporter(); - private VerifiableProperties props; - - @Before - public void setUp() throws Exception { - @SuppressWarnings({ "rawtypes", "unchecked" }) - Gauge g = registry.newGauge(System.class, "gauge", gauge); - Counter counter = registry.newCounter(System.class, "counter"); - Histogram histogram = registry.newHistogram(System.class, "histogram"); - Meter meter = registry.newMeter(System.class, "meter", "empty", TimeUnit.MILLISECONDS); - Timer timer = registry.newTimer(System.class, "timer"); - list.add(g); - list.add(counter); - list.add(histogram); - list.add(meter); - list.add(timer); - Properties properties = new Properties(); - properties.setProperty("kafka.timeline.metrics.sendInterval", "5900"); - properties.setProperty("kafka.timeline.metrics.maxRowCacheSize", "10000"); - properties.setProperty("kafka.timeline.metrics.host", "localhost"); - properties.setProperty("kafka.timeline.metrics.port", "8188"); - properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true"); - props = new VerifiableProperties(properties); - } - - @Test - public void testReporterStartStop() { - mockStatic(Metrics.class); - EasyMock.expect(Metrics.defaultRegistry()).andReturn(registry).times(2); - TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(kafkaTimelineMetricsReporter); - kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache); - HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class); - kafkaTimelineMetricsReporter.setHttpClient(httpClient); - replay(Metrics.class, httpClient, timelineMetricsCache); - kafkaTimelineMetricsReporter.init(props); - kafkaTimelineMetricsReporter.stopReporter(); - verifyAll(); - } - - private TimelineMetricsCache getTimelineMetricsCache(KafkaTimelineMetricsReporter kafkaTimelineMetricsReporter) { - TimelineMetricsCache timelineMetricsCache = EasyMock.createNiceMock(TimelineMetricsCache.class); - kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache); - EasyMock.expect(timelineMetricsCache.getTimelineMetric("key1")).andReturn(new TimelineMetric()).once(); - timelineMetricsCache.putTimelineMetric(EasyMock.anyObject(TimelineMetric.class)); - EasyMock.expectLastCall().once(); - return timelineMetricsCache; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java deleted file mode 100644 index 41f9126..0000000 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metrics2.sink.kafka; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.yammer.metrics.core.Counter; -import com.yammer.metrics.core.Gauge; -import com.yammer.metrics.core.Histogram; -import com.yammer.metrics.core.Meter; -import com.yammer.metrics.core.Metric; -import com.yammer.metrics.core.MetricName; -import com.yammer.metrics.core.MetricsRegistry; -import com.yammer.metrics.core.Timer; - -public class ScheduledReporterTest { - private final Gauge gauge = mock(Gauge.class); - private final List<Metric> list = new ArrayList<Metric>(); - private final MetricsRegistry registry = new MetricsRegistry(); - private final ScheduledReporter reporter = spy(new ScheduledReporter(registry, "example", TimeUnit.SECONDS, - TimeUnit.MILLISECONDS) { - @Override - public void report(Set<Entry<MetricName, Metric>> metrics) { - // nothing doing! - } - }); - - @SuppressWarnings("unchecked") - @Before - public void setUp() throws Exception { - Gauge g = registry.newGauge(System.class, "gauge", gauge); - Counter counter = registry.newCounter(System.class, "counter"); - Histogram histogram = registry.newHistogram(System.class, "histogram"); - Meter meter = registry.newMeter(System.class, "meter", "empty", TimeUnit.MILLISECONDS); - Timer timer = registry.newTimer(System.class, "timer"); - list.add(g); - list.add(counter); - list.add(histogram); - list.add(meter); - list.add(timer); - reporter.start(200, TimeUnit.MILLISECONDS); - } - - @After - public void tearDown() throws Exception { - reporter.stop(); - } - - @Test - public void pollsPeriodically() throws Exception { - Thread.sleep(500); - verify(reporter, times(2)).report(set(list)); - } - - private Set<Entry<MetricName, Metric>> set(List<Metric> metrics) { - final Map<MetricName, Metric> map = new HashMap<MetricName, Metric>(); - for (Metric metric : metrics) { - String name = null; - if (metric instanceof Gauge) { - name = "gauge"; - } else if (metric instanceof Counter) { - name = "counter"; - } else if (metric instanceof Histogram) { - name = "histogram"; - } else if (metric instanceof Meter) { - name = "meter"; - } else if (metric instanceof Timer) { - name = "timer"; - } - map.put(new MetricName(System.class, name), metric); - } - return map.entrySet(); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-metrics/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml index 22067d9..d99cc82 100644 --- a/ambari-metrics/pom.xml +++ b/ambari-metrics/pom.xml @@ -14,7 +14,11 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ---><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> <groupId>org.apache.ambari</groupId> <modelVersion>4.0.0</modelVersion> @@ -25,7 +29,6 @@ <module>ambari-metrics-common</module> <module>ambari-metrics-hadoop-sink</module> <module>ambari-metrics-flume-sink</module> - <module>ambari-metrics-kafka-sink</module> <module>ambari-metrics-storm-sink</module> <module>ambari-metrics-timelineservice</module> <module>ambari-metrics-host-monitoring</module> http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml index 9c11007..dc1b6b4 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml @@ -101,7 +101,7 @@ <name>num.partitions</name> <value>1</value> <description> - The default number of partitions per topic. + The default number of partitions per topic. </description> </property> <property> @@ -291,9 +291,9 @@ </property> <property> <name>kafka.metrics.reporters</name> - <value>{{kafka_metrics_reporters}}</value> + <value>kafka.ganglia.KafkaGangliaMetricsReporter</value> <description> - kafka ganglia metrics reporter and kafka timeline metrics reporter + kafka ganglia metrics reporter </description> </property> <property> @@ -318,29 +318,4 @@ <value>kafka</value> <description>Ganglia group name </description> </property> - <property> - <name>kafka.timeline.metrics.reporter.enabled</name> - <value>true</value> - <description>Kafka timeline metrics reporter enable</description> - </property> - <property> - <name>kafka.timeline.metrics.host</name> - <value>{{metric_collector_host}}</value> - <description>Timeline host</description> - </property> - <property> - <name>kafka.timeline.metrics.port</name> - <value>{{metric_collector_port}}</value> - <description>Timeline port</description> - </property> - <property> - <name>kafka.timeline.metrics.reporter.sendInterval</name> - <value>5900</value> - <description>Timeline metrics reporter send interval</description> - </property> - <property> - <name>kafka.timeline.metrics.maxRowCacheSize</name> - <value>10000</value> - <description>Timeline metrics reporter send interval</description> - </property> </configuration> http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml index f322406..7ad4396 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml @@ -50,12 +50,6 @@ # The java implementation to use. export JAVA_HOME={{java64_home}} export PATH=$PATH:$JAVA_HOME/bin - -# Add kafka sink to classpath and related depenencies -if [ -e "/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar" ]; then - export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar - export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/lib/* -fi </value> </property> </configuration> http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py index 33ee47a..c0231a8 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py @@ -35,9 +35,6 @@ def kafka(): kafka_server_config = mutable_config_dict(params.config['configurations']['kafka-broker']) kafka_server_config['broker.id'] = brokerid kafka_server_config['host.name'] = params.hostname - kafka_server_config['kafka.timeline.metrics.host'] = params.metric_collector_host - kafka_server_config['kafka.timeline.metrics.port'] = params.metric_collector_port - kafka_server_config['kafka.metrics.reporters'] = params.kafka_metrics_reporters kafka_data_dir = kafka_server_config['log.dirs'] Directory(filter(None,kafka_data_dir.split(",")), owner=params.kafka_user, http://git-wip-us.apache.org/repos/asf/ambari/blob/f1d354cc/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py index 800ccc4..067e537 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py @@ -60,30 +60,3 @@ if (('kafka-log4j' in config['configurations']) and ('content' in config['config log4j_props = config['configurations']['kafka-log4j']['content'] else: log4j_props = None - -if 'ganglia_server_host' in config['clusterHostInfo'] and \ - len(config['clusterHostInfo']['ganglia_server_host'])>0: - ganglia_installed = True - ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0] - ganglia_report_interval = 60 -else: - ganglia_installed = False - -kafka_metrics_reporters="" - -if ganglia_installed: - kafka_metrics_reporters = "kafka.ganglia.KafkaGangliaMetricsReporter" - -ams_collector_hosts = default("/clusterHostInfo/metric_collector_hosts", []) -has_metric_collector = not len(ams_collector_hosts) == 0 - -if has_metric_collector: - metric_collector_host = ams_collector_hosts[0] - metric_collector_port = default("/configurations/ams-site/timeline.metrics.service.webapp.address", "0.0.0.0:8188") - if metric_collector_port and metric_collector_port.find(':') != -1: - metric_collector_port = metric_collector_port.split(':')[1] - - if not len(kafka_metrics_reporters) == 0: - kafka_metrics_reporters = kafka_metrics_reporters + ',' - - kafka_metrics_reporters = kafka_metrics_reporters + "org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter"
